153

Java Producer API を使用して文字列メッセージを Kafka V. 0.8 に送信します。メッセージのサイズが約 15 MB の場合、MessageSizeTooLargeException. 40 MBに設定しようとしましmessage.max.bytesたが、それでも例外が発生します。小さなメッセージは問題なく機能しました。

(プロデューサーに例外が表示されます。このアプリケーションにはコンシューマーがありません。)

この例外を取り除くにはどうすればよいですか?

私のプロデューサー設定例

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

エラーログ:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
4

9 に答える 9

242

3 つ (または 4 つ) のプロパティを調整する必要があります。

  • コンシューマー側: fetch.message.max.bytes- これにより、コンシューマーが取得できるメッセージの最大サイズが決まります。
  • ブローカー側: replica.fetch.max.bytes- これにより、ブローカーのレプリカがクラスター内でメッセージを送信し、メッセージが正しくレプリケートされるようになります。これが小さすぎると、メッセージがレプリケートされないため、メッセージがコミット (完全にレプリケート) されないため、コンシューマーはメッセージを見ることができません。
  • ブローカー側: message.max.bytes- これは、ブローカーがプロデューサーから受信できるメッセージの最大サイズです。
  • ブローカー側 (トピックごと): max.message.bytes- これは、ブローカーがトピックに追加できるメッセージの最大サイズです。このサイズは圧縮前に検証されています。(デフォルトはブローカーのmessage.max.bytesです。)

2 番目の難しい方法を見つけました。Kafka から例外、メッセージ、または警告がまったく表示されないため、大きなメッセージを送信する場合は必ずこれを考慮してください。

于 2014-01-24T22:46:42.380 に答える
67

Kafka 0.10新しいコンシューマーに必要な小さな変更は、 laughing_man の回答と比較して:

  • message.max.bytesブローカー: 変更はありません。プロパティとを増やす必要がありますreplica.fetch.max.bytesmessage.max.bytesと等しいか、それよりも小さい (*) 必要がありreplica.fetch.max.bytesます。
  • max.request.sizeプロデューサー:より大きなメッセージを送信するために増やします。
  • コンシューマ:max.partition.fetch.bytesより大きなメッセージを受信するために増やします。

message.max.bytes(*) <=の詳細については、コメントをお読みください。replica.fetch.max.bytes

于 2016-08-18T20:13:21.847 に答える
16

次のプロパティをオーバーライドする必要があります。

ブローカー構成 ($KAFKA_HOME/config/server.properties)

  • replica.fetch.max.bytes
  • message.max.bytes

Consumer Configs($KAFKA_HOME/config/consumer.properties)
このステップはうまくいきませんでした。コンシューマアプリに追加しましたが、正常に機能していました

  • fetch.message.max.bytes

サーバーを再起動します。

詳細については、このドキュメントを参照してください: http://kafka.apache.org/08/configuration.html

于 2014-02-17T09:56:58.873 に答える
7

message.max.bytes属性が消費者のプロパティと同期している必要があることを覚えておく必要があります。fetch.message.max.bytesフェッチ サイズは少なくとも最大メッセージ サイズと同じ大きさである必要があります。そうしないと、プロデューサーがコンシューマーが消費/フェッチできるよりも大きなメッセージを送信できる状況が発生する可能性があります。見てみる価値はあるかもしれません。
使用している Kafka のバージョンは? また、取得している詳細なトレースをいくつか提供してください。... のようなものがpayload size of xxxx larger than 1000000ログに表示されますか?

于 2014-01-09T20:25:09.383 に答える
5

ここでの回答のほとんどは、時代遅れであるか、完全ではないと思います。

Sacha Vetter の回答( Kafka 0.10の更新を含む)を参照するために、いくつかの追加情報と公式ドキュメントへのリンクを提供したいと思います。


プロデューサーの構成:

  • max.request.size(リンク) 1 MB を超えるファイルの場合は増やす必要があります。それ以外の場合は拒否されます

ブローカー/トピック構成:

  • message.max.bytesブローカーレベルでメッセージサイズを増やしたい場合は、(リンク)を設定できます。しかし、ドキュメントから: 「これは、トピック レベルの max.message.bytes 構成でトピックごとに設定できます。」
  • max.message.bytes(リンク) 1 つのトピックだけがより大きなファイルを受け入れることができる場合は、増加する可能性があります。ブローカー構成は変更しないでください。

Kafka クラスターのクライアントとして自分でトピックを構成できるため (たとえば、管理クライアントを使用して) 、トピックに制限された構成を常に好みます。ブローカーの構成自体には何の影響も及ぼさないかもしれません。


上記の回答では、必要に応じてさらにいくつかの構成が言及されています。

ドキュメントから: 「これは絶対的な最大値ではありません。フェッチの最初の空でないパーティションの最初のレコード バッチがこの値よりも大きい場合、レコード バッチは引き続き返され、確実に進行できるようになります。」

ドキュメントから: 「レコードはコンシューマーによってバッチでフェッチされます。フェッチの最初の空でないパーティションの最初のレコード バッチがこの制限よりも大きい場合、バッチは引き続き返され、コンシューマーが確実に処理を進めることができます。 "

ドキュメントから:「レコードは消費者によってバッチでフェッチされ、フェッチの最初の空でないパーティションの最初のレコード バッチがこの値よりも大きい場合、レコード バッチは引き続き返され、消費者が確実に作成できるようになります。進捗。"


結論:メッセージのフェッチに関する構成は、メッセージを処理するために変更する必要はありません。これらの構成のデフォルト値よりも大きくなっています (これは小規模なセットアップでテストされています)。おそらく、コンシューマーは常にサイズ 1 のバッチを取得する可能性があります。ただし、前の回答で述べたように、最初のブロックから 2 つの構成を設定する必要があります。

この明確化は、パフォーマンスについて何も伝えるべきではなく、これらの構成を設定する、または設定しないことを推奨するものではありません。最適な値は、具体的な計画されたスループットとデータ構造に応じて個別に評価する必要があります。

于 2021-05-27T12:19:03.390 に答える
3

landoop kafka を使用している場合: 次のような環境変数で構成値を渡すことができます。

docker run -d --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083  -p 9581-9585:9581-9585 -p 9092:9092
 -e KAFKA_TOPIC_MAX_MESSAGE_BYTES=15728640 -e KAFKA_REPLICA_FETCH_MAX_BYTES=15728640  landoop/fast-data-dev:latest `

また、rdkafka を使用している場合は、次のようにプロデューサー構成で message.max.bytes を渡します。

  const producer = new Kafka.Producer({
        'metadata.broker.list': 'localhost:9092',
        'message.max.bytes': '15728640',
        'dr_cb': true
    });

同様に、消費者にとっても、

  const kafkaConf = {
   "group.id": "librd-test",
   "fetch.message.max.bytes":"15728640",
   ... .. }                                                                                                                                                                                                                                                      
于 2020-06-02T20:25:27.057 に答える