1

小さなメッセージを送信するとき、Java Kafka Producer 0.9 クライアントで非常に低いパフォーマンスが観察されています。メッセージは大きな要求バッチに蓄積されないため、小さなレコードがそれぞれ個別に送信されます。

クライアント構成の何が問題になっていますか? それとも、これは別の問題ですか?


Kafka クライアント 0.9.0.0 を使用します。Kafka の未リリースの 9.0.1 または 9.1 の修正済みリストまたは未解決リストに関連する投稿は見られなかったため、クライアント構成とサーバー インスタンスに焦点を当てています。

linger.ms によって、クライアントがレコードをバッチに蓄積する必要があることは理解しています。

linger.ms を 10 に設定しました (そして 100 と 1000 も試しました) が、これらはバッチ蓄積レコードにはなりませんでした。レコード サイズが約 100 バイトで、リクエスト バッファ サイズが 16K の場合、1 回のリクエストで約 160 個のメッセージが送信されると予想されます。

クライアントでのトレースは、新しい Bluemix Messaging Hub (Kafka Server 0.9) サービス インスタンスを割り当てたにもかかわらず、パーティションがいっぱいである可能性があることを示しているようです。テスト クライアントは、他の I/O なしで複数のメッセージをループで送信しています。


ログには、「トピック mytopic パーティション 0 がいっぱいになっているか、新しいバッチを取得しているため、送信者を目覚めさせる」という疑わしい行を含む繰り返しシーケンスが表示されます。

テスト ケースでは、新しく割り当てられたパーティションは本質的に空である必要があります。

2015-12-10 15:14:41,335 3677 [main] TRACE com.isllc.client.producer.ExploreProducer - 送信レコード: Topic='mytopic'、Key='records'、Value='Kafka 0.9 Java クライアント レコード テスト メッセージ00011 2015-12-10T15:14:41.335-05:00'  
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer - 送信レコード ProducerRecord(topic=mytopic、partition=null、key=[B@670b40af、value=[Bトピック mytopic パーティション 0 へのコールバックが null の @ 4923ab24  
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.internals.RecordAccumulator - トピック mytopic パーティション 0 に新しい 16384 バイトのメッセージ バッファを割り当てています  
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer - トピック mytopic パーティション 0 がいっぱいになっているか、新しいバッチを取得しているため、送信者を目覚めさせる  
2015-12-10 15:14:41,348 3690 [カフカ プロデューサー ネットワーク スレッド | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - データを送信する準備ができているノード: [Node(0, kafka01-prod01.messagehub.services.us-south.bluemix.net, 9094)]  
2015-12-10 15:14:41,348 3690 [カフカ プロデューサー ネットワーク スレッド | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - 1 つのプロデュース リクエストを作成しました: [ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.producer.internals.Sender$1@6d62e963, request= RequestSend(header={api_key=0,api_version=1,correlation_id=11,client_id=ExploreProducer}, body={acks=-1,timeout=30000,topic_data=[{topic=mytopic,data=[{partition=0, record_set=java.nio.HeapByteBuffer[pos=0 lim=110 cap=16384]}]}]}), createdTimeMs=1449778481348, sendTimeMs=0)]  
2015-12-10 15:14:41,412 3754 [カフカ-プロデューサー-ネットワーク-スレッド | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - ノード 0 から相関 ID 11 のプロデュース応答を受信しました  
2015-12-10 15:14:41,412 3754 [カフカ-プロデューサー-ネットワーク-スレッド | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.RecordBatch - ベース オフセット オフセット 130 およびエラー: null でトピック パーティション mytopic-0 にメッセージを生成しました。  
2015-12-10 15:14:41,412 3754 [メイン] トレース com.isllc.client.producer.ExploreProducer - 返されたメタデータを送信: トピック = 'mytopic'、パーティション = 0、オフセット = 130  
2015-12-10 15:14:41,412 3754 [main] TRACE com.isllc.client.producer.ExploreProducer - レコードの送信: Topic='mytopic'、Key='records'、Value='Kafka 0.9 Java クライアント レコード テスト メッセージ00012 2015-12-10T15:14:41.412-05:00'

ログエントリは、送信されたレコードごとに上記のように繰り返されます

次のプロパティ ファイルが用意されています。

2015-12-10 15:14:37,843 185 [main] INFO com.isllc.client.AbstractClient - Kafka クライアントのファイルから取得されたプロパティ: kafka-producer.properties
2015-12-10 15:14:37,909 251 [メイン] INFO com.isllc.client.AbstractClient - acks=-1
2015-12-10 15:14:37,909 251 [メイン] 情報 com.isllc.client.AbstractClient - ssl.protocol=TLSv1.2
2015-12-10 15:14:37,909 251 [メイン] INFO com.isllc.client.AbstractClient - key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
2015-12-10 15:14:37,910 252 [メイン] INFO com.isllc.client.AbstractClient - client.id=ExploreProducer
2015-12-10 15:14:37,910 252 [メイン] 情報 com.isllc.client.AbstractClient - ssl.truststore.identification.algorithm=HTTPS
2015-12-10 15:14:37,910 252 [メイン] INFO com.isllc.client.AbstractClient - value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
2015-12-10 15:14:37,910 252 [メイン] INFO com.isllc.client.AbstractClient - ssl.truststore.password=changeit
2015-12-10 15:14:37,910 252 [メイン] INFO com.isllc.client.AbstractClient - ssl.truststore.type=JKS
2015-12-10 15:14:37,910 252 [メイン] INFO com.isllc.client.AbstractClient - ssl.enabled.protocols=TLSv1.2
2015-12-10 15:14:37,910 252 [メイン] INFO com.isllc.client.AbstractClient - ssl.truststore.location=/Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib /セキュリティ/cacerts
2015-12-10 15:14:37,910 252 [メイン] INFO com.isllc.client.AbstractClient - bootstrap.servers=kafka01-prod01.messagehub.services.us-south.bluemix.net:9094,kafka02-prod01.messagehub .services.us-south.bluemix.net:9094,kafka03-prod01.messagehub.services.us-south.bluemix.net:9094,kafka04-prod01.messagehub.services.us-south.bluemix.net:9094,kafka05 -prod01.messagehub.services.us-south.bluemix.net:9094
2015-12-10 15:14:37,910 252 [メイン] INFO com.isllc.client.AbstractClient - security.protocol=SASL_SSL

さらに、コードに linger.ms=10 を追加しました。

Kafka クライアントは、展開/マージされた構成リストを表示します (および linger.ms 設定を表示します)。

2015-12-10 15:14:37,970 312 [メイン] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig 値:
    圧縮タイプ = なし
    メトリック.レポーター = []
    metadata.max.age.ms = 300000
    メタデータ.fetch.timeout.ms = 60000
    再接続.バックオフ.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    bootstrap.servers = [kafka01-prod01.messagehub.services.us-south.bluemix.net:9094, kafka02-prod01.messagehub.services.us-south.bluemix.net:9094, kafka03-prod01.messagehub.services.us -south.bluemix.net:9094, kafka04-prod01.messagehub.services.us-south.bluemix.net:9094, kafka05-prod01.messagehub.services.us-south.bluemix.net:9094]
    再試行.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    buffer.memory = 33554432
    タイムアウト.ms = 30000
    key.serializer = クラス org.apache.kafka.common.serialization.ByteArraySerializer
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.keystore.type = JKS
    ssl.trustmanager.algorithm = PKIX
    block.on.buffer.full = false
    ssl.key.password = null
    max.block.ms = 60000
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    ssl.truststore.password = [非表示]
    max.in.flight.requests.per.connection = 5
    metrics.num.samples = 2
    client.id = ExploreProducer
    ssl.endpoint.identification.algorithm = null
    ssl.protocol = TLSv1.2
    request.timeout.ms = 30000
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2]
    アック = -1
    バッチサイズ = 16384
    ssl.keystore.location = null
    receive.buffer.bytes = 32768
    ssl.cipher.suites = null
    ssl.truststore.type = JKS
    security.protocol = SASL_SSL
    再試行 = 0
    max.request.size = 1048576
    value.serializer = クラス org.apache.kafka.common.serialization.ByteArraySerializer
    ssl.truststore.location = /Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts
    ssl.keystore.password = null
    ssl.keymanager.algorithm = SunX509
    メトリックス.サンプル.ウィンドウ.ms = 30000
    partitioner.class = クラス org.apache.kafka.clients.producer.internals.DefaultPartitioner
    send.buffer.bytes = 131072
    linger.ms = 10

100 レコードを送信した後の Kafka メトリクス:

100 送信の期間は 8787 ミリ秒。7687 バイトを送信しました。  
    batch-size-avg = 109.87 [リクエストごとにパーティションごとに送信される平均バイト数]  
    batch-size-max = 110.0 [リクエストごとにパーティションごとに送信される最大バイト数]  
    buffer-available-bytes = 3.3554432E7 [使用されていない (未割り当てまたは空きリストにある) バッファ メモリの合計量]  
    buffer-exhausted-rate = 0.0 [バッファの枯渇によりドロップされたレコード送信の 1 秒あたりの平均数]  
    buffer-total-bytes = 3.3554432E7 [クライアントが使用できるバッファ メモリの最大量 (現在使用されているかどうかにかかわらず)]  
    bufferpool-wait-ratio = 0.0 [アペンダーがスペース割り当てを待機する時間の割合。]  
    バイトレート = 291.8348916277093 []  
    圧縮率 = 0.0 []  
    compression-rate-avg = 0.0 [レコード バッチの平均圧縮率]  
    connection-close-rate = 0.0 [ウィンドウ内で 1 秒あたりに閉じられた接続数]  
    connection-count = 2.0 [アクティブな接続の現在の数]  
    connection-creation-rate = 0.05180541884681138 [ウィンドウで 1 秒あたりに確立された新しい接続。]  
    着信バイト レート = 10.342564641029007 []  
    io-ratio = 0.0038877559207471236 [I/O スレッドが I/O に費やした時間の割合]  
    io-time-ns-avg = 353749.2840375587 [選択呼び出しごとの I/O の平均時間 (ナノ秒)]  
    io-wait-ratio = 0.21531227995769162 [I/O スレッドが待機に費やした時間の割合]  
    io-wait-time-ns-avg = 1.9591901192488264E7 [I/O スレッドが読み取りまたは書き込みの準備ができたソケットを待機するのに費やした平均時間 (ナノ秒単位)]
    metadata-age = 8.096 [現在使用されているプロデューサ メタデータの経過時間 (秒)]  
    network-io-rate = 5.2937784999213795 [すべての接続での 1 秒あたりのネットワーク操作 (読み取りまたは書き込み) の平均数]  
    発信バイト レート = 451.2298783403283 []  
    Produce-throttle-time-avg = 0.0 [ミリ秒単位の平均スロットル時間]  
    Produce-throttle-time-max = 0.0 [ミリ秒単位の最大スロットル時間]  
    record-error-rate = 0.0 [エラーになったレコード送信の 1 秒あたりの平均数]  
    record-queue-time-avg = 15.5 [レコード アキュムレータで費やされたミリ秒単位のレコード バッチの平均時間]  
    record-queue-time-max = 434.0 [レコード アキュムレータで費やされたミリ秒単位のレコード バッチの最大時間。]  
    記録再試行率 = 0.0 []  
    record-send-rate = 2.65611304417116 [1 秒あたりの平均送信レコード数]  
    record-size-avg = 97.87 [平均レコード サイズ]  
    record-size-max = 98.0 [最大レコード サイズ]  
    records-per-request-avg = 1.0 [リクエストあたりの平均レコード数]  
    request-latency-avg = 0.0 [ミリ秒単位の平均リクエスト レイテンシ]  
    リクエスト遅延最大 = 74.0 []  
    request-rate = 2.6468892499606897 [1 秒あたりの平均送信リクエスト数]  
    request-size-avg = 42.0 [ウィンドウ内のすべてのリクエストの平均サイズ..]  
    request-size-max = 170.0 [ウィンドウ内で送信されるリクエストの最大サイズ]  
    requests-in-flight = 0.0 [応答を待っている進行中のリクエストの現在の数]  
    response-rate = 2.651196976060479 [1 秒あたりの平均応答数]  
    select-rate = 10.989861465830819 [1 秒あたりに実行する新しい I/O を I/O レイヤーがチェックした回数]  
    waiting-threads = 0.0 [バッファ メモリがレコードをキューに入れるのを待ってブロックされたユーザー スレッドの数]  

ありがとう

4

1 に答える 1

5

Kafka Users メーリング リストの Guozhang Wang は、アプリケーション コードを確認することで問題を認識できました。

国章、

はい - あなたは問題を特定しました!

デバッグのために .get() を挿入しましたが、(巨大な!) 副作用については考えていませんでした。

非同期コールバックを使用すると、完全にうまく機能します。

ラップトップから Bluemix クラウドに 14 秒で 100,000 レコードを送信できるようになりました。

どうもありがとうございました!

ゲイリー


2015 年 12 月 13 日午後 2 時 48 分、Guozhang Wang は次のように書いています。

ゲイリー

「kafkaProducer.send(record).get();」を呼び出しています 各メッセージについて、get() 呼び出しは Future が初期化されるまでブロックされます。これにより、次のメッセージを送信する前に各メッセージの ACK を要求することで、送信されたすべてのメッセージが効果的に同期されるため、バッチ処理は行われません。

非同期送信に「send(record, callback)」を使用してみて、返されたメタデータからのエラーをコールバックに処理させることができます。

国章

于 2015-12-13T22:38:59.447 に答える