2

samza タスクの 1 つに不可解な問題があります。1 つのパーティションのメッセージを除いて、正しく動作します。このトピックには 9 つのパーティションがあります。1000 件のメッセージを送信すると、約 890 件しか受信しません。

samza ジョブによって処理されないことがわかっているパーティション キーを使用して kafka-console-consumer を確認しましたが、コンソール コンシューマーにメッセージが表示されるため、トピックに書き込まれていることがわかり、少なくともバニラ コンシューマーはそれをうまく見てください。

samza でデバッグ ログを有効にしましたが、次のような多くのメッセージが表示されorg.apache.samza.checkpoint.kafka.KafkaCheckpointManagerます。

チェックポイントの追加 チェックポイント [offsets={SystemStreamPartition [kafka, com.mycompany.indexing.document, 4]=448}] for taskName パーティション 4

パーティション 4 は常に 448 と表示されます。パーティション 0 にも同様のログがありますが、448 と表示されている場合は着実に増加しています。

これを絞り込むのに役立つ興味深い構成情報を喜んで共有しますが、今のところ、何を共有するかについて少し戸惑っています.

私は次のように実行しThreadJobFactoryています:

  • samza-kafka_2.10 バージョン 0.9.1

  • クライアント上の kafka_2.10 バージョン 0.8.2.1

  • カフカブローカー 0.9.0.0

アップデート

同じパーティション キーを使用してアップストリームの samza ジョブを調べたところ、アップストリームのパーティション 4 で問題が見つかりました。kafkacat で samza チェックポイントのトピックを確認すると、パーティション 4 のチェックポイントが進んでいないことがわかります。最初に私が見る:

{"SystemStreamPartition [kafka, resource.mutation, 6]":{"system":"kafka","partition":"6","offset":"96639","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 3]":{"system":"kafka","partition":"3","offset":"47135","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 0]":{"system":"kafka","partition":"0","offset":"49476","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 4]":{"system":"kafka","partition":"4","offset":"2556","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 8]":{"system":"kafka","partition":"8","offset":"62263","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 1]":{"system":"kafka","partition":"1","offset":"52151","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 7]":{"system":"kafka","partition":"7","offset":"58081","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 5]":{"system":"kafka","partition":"5","offset":"47712","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 2]":{"system":"kafka","partition":"2","offset":"45831","stream":"resource.mutation"}}
% Reached end of topic __samza_checkpoint_ver_1_for_resource-normalizer_1 [0] at offset 81713

それから 1 分後、次のように表示されます。

{"SystemStreamPartition [kafka, resource.mutation, 6]":{"system":"kafka","partition":"6","offset":"96624","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 3]":{"system":"kafka","partition":"3","offset":"47115","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 0]":{"system":"kafka","partition":"0","offset":"49462","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 4]":{"system":"kafka","partition":"4","offset":"2556","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 8]":{"system":"kafka","partition":"8","offset":"62252","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 1]":{"system":"kafka","partition":"1","offset":"52134","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 7]":{"system":"kafka","partition":"7","offset":"58063","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 5]":{"system":"kafka","partition":"5","offset":"47696","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 2]":{"system":"kafka","partition":"2","offset":"45817","stream":"resource.mutation"}}
% Reached end of topic __samza_checkpoint_ver_1_for_resource-normalizer_1 [0] at offset 81722

数値は 2556 を超えません。ただし、resource.mutationパーティション 4 の実際のトピックを見ると、最後のオフセットの範囲は他のものと同様で、現在は約 61000 であり、増加しています。

エラー メッセージや警告メッセージはまったくありません。パーティション 4 からの消費を停止するだけです。

4

1 に答える 1

2

max.message.bytes問題は、kafka コンシューマのデフォルトを超えるメッセージがあったことでした。しかし、何らかのエラー メッセージを表示するのではなく、そのパーティションの使用を担当するスレッドは、単にそのメッセージにハングアップします。他のパーティション スレッドは問題なく続行されます。

systems.kafka.consumer.fetch.message.max.bytesパーティション上の各メッセージを消費するのに十分な大きさの値に設定してジョブを再開すると、中断したところから再開され、すべてが期待どおりに機能し始めました。

于 2016-04-29T21:27:53.593 に答える