2

spring-kafka および spring-kafka-test バージョン 1.0.2.RELEASE を使用しています。

私のテストの 1 つで、私のアプリケーションは、KafkaTemplate とほとんどデフォルトの構成設定を使用して、EmbeddedKafka インスタンスの単一の TopicPartion に 100 レコードを連続して送信します。

KafkaTestUtils.getRecords(consumer) メソッドを使用して、Kafka インスタンスからレコードを取得し、それらがすべて送信されたことを確認します。

初めて getRecords を呼び出したとき、1 つのレコードしか受け取りません。もう一度呼び出すと、残りの 99 が得られます。

コンシューマーの位置を TopicPartition の先頭に明示的に設定してから getRecords を呼び出すと、すべて 100 になります。

getRecords が最初に 1 つのレコードしか取得しないのはなぜですか? コンシューマーで seekToBeginning を明示的に呼び出して、一度に 100 個すべてを取得するより良い方法はありますか?

4

2 に答える 2

2

これはタイミングの問題のように聞こえます。最初に呼び出したときに 1 つのメッセージしか利用できなかった可能性はpoll()十分にあります。そのメソッドは、フェッチされるメッセージの数を保証しません。コードを書くとき、一度に X 個のレコードを受け取ると想定すべきではありません。Kafka 0.10 のコンシューマ プロパティがありmax.poll.records、テスト目的で 1 に設定し、100 個すべてをポーリングするまで受信ループを実行することができます。

于 2016-08-13T11:30:59.513 に答える
0

ほとんどの場合、単なる競合状態です。コンシューマーが待機してpoll()おり、ブローカーは最初のメッセージが到着するとすぐに送信します。

プロパティfetch.min.byteskafka docsfetch.max.wait.msを参照してください。

fetch.min.bytesデフォルトでは 1 です。

編集

を呼び出す前に を試すこともできflush()ます。KafkaTemplategetRecords()

ただし、テストは 1 回のフェッチですべてのメッセージを取得することに依存するべきではありません。

于 2016-08-11T17:41:27.050 に答える