Apache Kafka Java クライアント (0.9) を使用して、 Kafka Producer クラスを使用して長い一連のレコードをブローカーに送信しようとしています。
非同期送信メソッドは、しばらくの間すぐに戻り、その後、各呼び出しで短時間ブロックを開始します。約 30 秒後、クライアントは例外 ( TimeoutException ) のスローを開始し、メッセージ"Batch expired"が表示されます。
この例外がスローされる原因は何ですか?
Apache Kafka Java クライアント (0.9) を使用して、 Kafka Producer クラスを使用して長い一連のレコードをブローカーに送信しようとしています。
非同期送信メソッドは、しばらくの間すぐに戻り、その後、各呼び出しで短時間ブロックを開始します。約 30 秒後、クライアントは例外 ( TimeoutException ) のスローを開始し、メッセージ"Batch expired"が表示されます。
この例外がスローされる原因は何ですか?
この例外は、送信できる速度よりも速い速度でレコードをキューに入れていることを示しています。
sendメソッドを呼び出すと、ブローカーに送信するためにProducerRecordが内部バッファーに格納されます。このメソッドは、送信されたかどうかに関係なく、 ProducerRecordがバッファリングされるとすぐに戻ります。
レコードはブローカーに送信するためにバッチにグループ化され、メッセージごとのトランスポート オーバーヘッドを削減し、スループットを向上させます。
レコードがバッチに追加されると、指定された期間内に送信されたことを確認するために、そのバッチを送信するための時間制限があります。これは Producer 構成パラメーターrequest.timeout.msによって制御され、デフォルトは 30 秒です。
バッチがタイムアウト制限よりも長くキューに入れられている場合、例外がスローされます。そのバッチ内のレコードは、送信キューから削除されます。
構成パラメーターを使用してタイムアウト制限を増やすと、クライアントは有効期限が切れる前により長くバッチをキューに入れることができます。
この例外は、まったく異なるコンテキストで発生しました。
ZooKeeper VM、Broker VM、Producer/Consumer VM のミニ クラスタをセットアップしました。サーバー (9092) と Zookeeper (2181) で必要なすべてのポートを開き、コンシューマー/パブリッシャー vm からブローカーにメッセージをパブリッシュしようとしました。OPで言及された例外を受け取りましたが、これまでに1つのメッセージしか公開していなかった(または少なくとも試みた)ため、解決策はタイムアウトまたはバッチサイズを増やすことではありませんでした。そこで検索したところ、コンシューマー/プロデューサー vm (ClosedChannelException) 内からメッセージを消費しようとしたときに発生した同様の問題を説明しているこのメーリング リストを見つけました: http://grokbase.com/t/kafka/users/152jsjekrm/having-trouble -with-the-simplest-remote-kafka-config このメーリング リストの最後の投稿では、実際に問題を解決する方法について説明しています。
簡単に言えばChannelClosedException
、例外とBatch Expired
例外の両方に直面した場合、ファイル内のこの行を次のように変更しserver.config
、ブローカーを再起動する必要があります。
advertised.host.name=<broker public IP address>
設定されていない場合は、host.name
プロパティにフォールバックし (どちらも設定されていない可能性があります)、InetAddress
Java クラスの正規のホスト名にフォールバックしますが、これは最終的に正しくなく、リモート ノードを混乱させます。
ブローカーに送信するまでの時間を制御するパラメーターはlinger.ms
. デフォルト値は 0 (遅延なし) です。