Kafka (ver 0.82) から avro ペイロードを取得し、HDFS に .avro ファイルとして保存するために、約 1 年間 Camus を実行してきました。いくつかの Kafka トピックを使用しています。最近、社内の新しいチームが、運用前環境で約 60 の新しいトピックを登録し、これらのトピックにデータを送信し始めました。チームは、データを kafka トピックにルーティングするときにいくつかの間違いを犯しました。これにより、Camus がこれらのトピックの avro にペイロードを逆シリアル化したときにエラーが発生しました。「その他の失敗」エラーしきい値を超えたため、Camus ジョブは失敗しました。失敗後の Camus の動作は驚くべきものでした。他の開発者に確認して、観察した動作が予期されたものなのか、それとも実装に問題があるのかを確認したかったのです。
「その他の失敗」しきい値を超えたために Camus ジョブが失敗したときに、この動作に気付きました。 1. すべてのマッパー タスクが成功したため、TaskAttempt のコミットが許可されました。最終的な HDFS の場所。2. CamusJob は、% エラー率を計算するときに例外をスローし (これはマッパー コミットに続いています)、ジョブが失敗しました 3. ジョブが失敗したため (私が思うに)、Kafka オフセットは進みませんでした
この動作で遭遇した問題は、Camus ジョブが 5 分ごとに実行されるように設定されていることです。そのため、データが HDFS にコミットされ、ジョブが失敗し、Kafka オフセットが更新されていないことを 5 分ごとに確認しました。これは、ディスクがいっぱいになるまで重複データを書き込んだことを意味します。
結果を確認する統合テストを作成しました。トピックに 10 個の適切なレコードを送信し、同じトピックに予期しないスキーマを使用する 10 個のレコードを送信し、そのトピックのみをホワイトリストに登録して Camus ジョブを実行し、10 個のレコードがHDFS に書き込まれ、Kafka オフセットは高度ではありません。以下は、そのテストのログのスニペットと、ジョブの実行中に使用したプロパティです。
これが Camus の予期された動作なのか、実装に問題があるのか、この動作 (データの複製) を防ぐための最善の方法は何かわかりません。
ありがとう〜マット
テストの CamusJob プロパティ:
etl.destination.path=/user/camus/kafka/data
etl.execution.base.path=/user/camus/kafka/workspace
etl.execution.history.path=/user/camus/kafka/history
dfs.default.classpath.dir=/user/camus/kafka/libs
etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.AvroRecordWriterProvider
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder
camus.message.timestamp.format=yyyy-MM-dd HH:mm:ss Z
mapreduce.output.fileoutputformat.compress=false
mapred.map.tasks=15
kafka.max.pull.hrs=1
kafka.max.historical.days=3
kafka.whitelist.topics=advertising.edmunds.admax
log4j.configuration=true
kafka.client.name=camus
kafka.brokers=<kafka brokers>
max.decoder.exceptions.to.print=5
post.tracking.counts.to.kafka=true
monitoring.event.class=class.that.generates.record.to.submit.counts.to.kafka
kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.AvroRestSchemaRegistry
etl.schema.registry.url=<schema repo url>
etl.run.tracking.post=false
kafka.monitor.time.granularity=10
etl.daily=daily
etl.ignore.schema.errors=false
etl.output.codec=deflate
etl.deflate.level=6
etl.default.timezone=America/Los_Angeles
mapred.output.compress=false
mapred.map.max.attempts=2
マッパーが成功した後のコミット動作と、「その他」のしきい値を超えたために後続のジョブが失敗したことを示す、テストのログ スニペット:
LocalJobRunner] - advertising.edmunds.admax:2:6; advertising.edmunds.admax:3:7 begin read at 2016-07-08T05:50:26.215-07:00; advertising.edmunds.admax:1:5; advertising.edmunds.admax:2:2; advertising.edmunds.admax:3:3 begin read at 2016-07-08T05:50:30.517-07:00; advertising.edmunds.admax:0:4 > map
[Task] - Task:attempt_local866350146_0001_m_000000_0 is done. And is in the process of committing
[LocalJobRunner] - advertising.edmunds.admax:2:6; advertising.edmunds.admax:3:7 begin read at 2016-07-08T05:50:26.215-07:00; advertising.edmunds.admax:1:5; advertising.edmunds.admax:2:2; advertising.edmunds.admax:3:3 begin read at 2016-07-08T05:50:30.517-07:00; advertising.edmunds.admax:0:4 > map
[Task] - Task attempt_local866350146_0001_m_000000_0 is allowed to commit now
[EtlMultiOutputFormat] - work path: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0
[EtlMultiOutputFormat] - Destination base path: /user/camus/kafka/data
[EtlMultiOutputFormat] - work file: data.advertising-edmunds-admax.3.3.1467979200000-m-00000.avro
[EtlMultiOutputFormat] - Moved file from: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0/data.advertising-edmunds-admax.3.3.1467979200000-m-00000.avro to: /user/camus/kafka/data/advertising-edmunds-admax/advertising-edmunds-admax.3.3.2.2.1467979200000.avro
[EtlMultiOutputFormat] - work file: data.advertising-edmunds-admax.3.7.1467979200000-m-00000.avro
[EtlMultiOutputFormat] - Moved file from: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0/data.advertising-edmunds-admax.3.7.1467979200000-m-00000.avro to: /user/camus/kafka/data/advertising-edmunds-admax/advertising-edmunds-admax.3.7.8.8.1467979200000.avro
[Task] - Task 'attempt_local866350146_0001_m_000000_0' done.
[LocalJobRunner] - Finishing task: attempt_local866350146_0001_m_000000_0
[LocalJobRunner] - map task executor complete.
[Job] - map 100% reduce 0%
[Job] - Job job_local866350146_0001 completed successfully
[Job] - Counters: 23
File System Counters
FILE: Number of bytes read=117251
FILE: Number of bytes written=350942
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=10
Map output records=15
Input split bytes=793
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=13
Total committed heap usage (bytes)=251658240
com.linkedin.camus.etl.kafka.mapred.EtlRecordReader$KAFKA_MSG
DECODE_SUCCESSFUL=10
SKIPPED_OTHER=10
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=5907
total
data-read=840
decode-time(ms)=123
event-count=20
mapper-time(ms)=58
request-time(ms)=12114
skip-old=0
[CamusJob] - Group: File System Counters
[CamusJob] - FILE: Number of bytes read: 117251
[CamusJob] - FILE: Number of bytes written: 350942
[CamusJob] - FILE: Number of read operations: 0
[CamusJob] - FILE: Number of large read operations: 0
[CamusJob] - FILE: Number of write operations: 0
[CamusJob] - Group: Map-Reduce Framework
[CamusJob] - Map input records: 10
[CamusJob] - Map output records: 15
[CamusJob] - Input split bytes: 793
[CamusJob] - Spilled Records: 0
[CamusJob] - Failed Shuffles: 0
[CamusJob] - Merged Map outputs: 0
[CamusJob] - GC time elapsed (ms): 13
[CamusJob] - Total committed heap usage (bytes): 251658240
[CamusJob] - Group: com.linkedin.camus.etl.kafka.mapred.EtlRecordReader$KAFKA_MSG
[CamusJob] - DECODE_SUCCESSFUL: 10
[CamusJob] - SKIPPED_OTHER: 10
[CamusJob] - job failed: 50.0% messages skipped due to other, maximum allowed is 0.1%