小さなメッセージを送信するとき、Java Kafka Producer 0.9 クライアントで非常に低いパフォーマンスが観察されています。メッセージは大きな要求バッチに蓄積されないため、小さなレコードがそれぞれ個別に送信されます。
クライアント構成の何が問題になっていますか? それとも、これは別の問題ですか?
Kafka クライアント 0.9.0.0 を使用します。Kafka の未リリースの 9.0.1 または 9.1 の修正済みリストまたは未解決リストに関連する投稿は見られなかったため、クライアント構成とサーバー インスタンスに焦点を当てています。
linger.ms によって、クライアントがレコードをバッチに蓄積する必要があることは理解しています。
linger.ms を 10 に設定しました (そして 100 と 1000 も試しました) が、これらはバッチ蓄積レコードにはなりませんでした。レコード サイズが約 100 バイトで、リクエスト バッファ サイズが 16K の場合、1 回のリクエストで約 160 個のメッセージが送信されると予想されます。
クライアントでのトレースは、新しい Bluemix Messaging Hub (Kafka Server 0.9) サービス インスタンスを割り当てたにもかかわらず、パーティションがいっぱいである可能性があることを示しているようです。テスト クライアントは、他の I/O なしで複数のメッセージをループで送信しています。
ログには、「トピック mytopic パーティション 0 がいっぱいになっているか、新しいバッチを取得しているため、送信者を目覚めさせる」という疑わしい行を含む繰り返しシーケンスが表示されます。
テスト ケースでは、新しく割り当てられたパーティションは本質的に空である必要があります。
2015-12-10 15:14:41,335 3677 [main] TRACE com.isllc.client.producer.ExploreProducer - 送信レコード: Topic='mytopic'、Key='records'、Value='Kafka 0.9 Java クライアント レコード テスト メッセージ00011 2015-12-10T15:14:41.335-05:00'
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer - 送信レコード ProducerRecord(topic=mytopic、partition=null、key=[B@670b40af、value=[Bトピック mytopic パーティション 0 へのコールバックが null の @ 4923ab24
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.internals.RecordAccumulator - トピック mytopic パーティション 0 に新しい 16384 バイトのメッセージ バッファを割り当てています
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer - トピック mytopic パーティション 0 がいっぱいになっているか、新しいバッチを取得しているため、送信者を目覚めさせる
2015-12-10 15:14:41,348 3690 [カフカ プロデューサー ネットワーク スレッド | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - データを送信する準備ができているノード: [Node(0, kafka01-prod01.messagehub.services.us-south.bluemix.net, 9094)]
2015-12-10 15:14:41,348 3690 [カフカ プロデューサー ネットワーク スレッド | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - 1 つのプロデュース リクエストを作成しました: [ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.producer.internals.Sender$1@6d62e963, request= RequestSend(header={api_key=0,api_version=1,correlation_id=11,client_id=ExploreProducer}, body={acks=-1,timeout=30000,topic_data=[{topic=mytopic,data=[{partition=0, record_set=java.nio.HeapByteBuffer[pos=0 lim=110 cap=16384]}]}]}), createdTimeMs=1449778481348, sendTimeMs=0)]
2015-12-10 15:14:41,412 3754 [カフカ-プロデューサー-ネットワーク-スレッド | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - ノード 0 から相関 ID 11 のプロデュース応答を受信しました
2015-12-10 15:14:41,412 3754 [カフカ-プロデューサー-ネットワーク-スレッド | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.RecordBatch - ベース オフセット オフセット 130 およびエラー: null でトピック パーティション mytopic-0 にメッセージを生成しました。
2015-12-10 15:14:41,412 3754 [メイン] トレース com.isllc.client.producer.ExploreProducer - 返されたメタデータを送信: トピック = 'mytopic'、パーティション = 0、オフセット = 130
2015-12-10 15:14:41,412 3754 [main] TRACE com.isllc.client.producer.ExploreProducer - レコードの送信: Topic='mytopic'、Key='records'、Value='Kafka 0.9 Java クライアント レコード テスト メッセージ00012 2015-12-10T15:14:41.412-05:00'
ログエントリは、送信されたレコードごとに上記のように繰り返されます
次のプロパティ ファイルが用意されています。
2015-12-10 15:14:37,843 185 [main] INFO com.isllc.client.AbstractClient - Kafka クライアントのファイルから取得されたプロパティ: kafka-producer.properties 2015-12-10 15:14:37,909 251 [メイン] INFO com.isllc.client.AbstractClient - acks=-1 2015-12-10 15:14:37,909 251 [メイン] 情報 com.isllc.client.AbstractClient - ssl.protocol=TLSv1.2 2015-12-10 15:14:37,909 251 [メイン] INFO com.isllc.client.AbstractClient - key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 2015-12-10 15:14:37,910 252 [メイン] INFO com.isllc.client.AbstractClient - client.id=ExploreProducer 2015-12-10 15:14:37,910 252 [メイン] 情報 com.isllc.client.AbstractClient - ssl.truststore.identification.algorithm=HTTPS 2015-12-10 15:14:37,910 252 [メイン] INFO com.isllc.client.AbstractClient - value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 2015-12-10 15:14:37,910 252 [メイン] INFO com.isllc.client.AbstractClient - ssl.truststore.password=changeit 2015-12-10 15:14:37,910 252 [メイン] INFO com.isllc.client.AbstractClient - ssl.truststore.type=JKS 2015-12-10 15:14:37,910 252 [メイン] INFO com.isllc.client.AbstractClient - ssl.enabled.protocols=TLSv1.2 2015-12-10 15:14:37,910 252 [メイン] INFO com.isllc.client.AbstractClient - ssl.truststore.location=/Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib /セキュリティ/cacerts 2015-12-10 15:14:37,910 252 [メイン] INFO com.isllc.client.AbstractClient - bootstrap.servers=kafka01-prod01.messagehub.services.us-south.bluemix.net:9094,kafka02-prod01.messagehub .services.us-south.bluemix.net:9094,kafka03-prod01.messagehub.services.us-south.bluemix.net:9094,kafka04-prod01.messagehub.services.us-south.bluemix.net:9094,kafka05 -prod01.messagehub.services.us-south.bluemix.net:9094 2015-12-10 15:14:37,910 252 [メイン] INFO com.isllc.client.AbstractClient - security.protocol=SASL_SSL さらに、コードに linger.ms=10 を追加しました。
Kafka クライアントは、展開/マージされた構成リストを表示します (および linger.ms 設定を表示します)。
2015-12-10 15:14:37,970 312 [メイン] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig 値:
圧縮タイプ = なし
メトリック.レポーター = []
metadata.max.age.ms = 300000
メタデータ.fetch.timeout.ms = 60000
再接続.バックオフ.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [kafka01-prod01.messagehub.services.us-south.bluemix.net:9094, kafka02-prod01.messagehub.services.us-south.bluemix.net:9094, kafka03-prod01.messagehub.services.us -south.bluemix.net:9094, kafka04-prod01.messagehub.services.us-south.bluemix.net:9094, kafka05-prod01.messagehub.services.us-south.bluemix.net:9094]
再試行.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
タイムアウト.ms = 30000
key.serializer = クラス org.apache.kafka.common.serialization.ByteArraySerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
max.block.ms = 60000
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = [非表示]
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
client.id = ExploreProducer
ssl.endpoint.identification.algorithm = null
ssl.protocol = TLSv1.2
request.timeout.ms = 30000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2]
アック = -1
バッチサイズ = 16384
ssl.keystore.location = null
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = SASL_SSL
再試行 = 0
max.request.size = 1048576
value.serializer = クラス org.apache.kafka.common.serialization.ByteArraySerializer
ssl.truststore.location = /Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
メトリックス.サンプル.ウィンドウ.ms = 30000
partitioner.class = クラス org.apache.kafka.clients.producer.internals.DefaultPartitioner
send.buffer.bytes = 131072
linger.ms = 10
100 レコードを送信した後の Kafka メトリクス:
100 送信の期間は 8787 ミリ秒。7687 バイトを送信しました。
batch-size-avg = 109.87 [リクエストごとにパーティションごとに送信される平均バイト数]
batch-size-max = 110.0 [リクエストごとにパーティションごとに送信される最大バイト数]
buffer-available-bytes = 3.3554432E7 [使用されていない (未割り当てまたは空きリストにある) バッファ メモリの合計量]
buffer-exhausted-rate = 0.0 [バッファの枯渇によりドロップされたレコード送信の 1 秒あたりの平均数]
buffer-total-bytes = 3.3554432E7 [クライアントが使用できるバッファ メモリの最大量 (現在使用されているかどうかにかかわらず)]
bufferpool-wait-ratio = 0.0 [アペンダーがスペース割り当てを待機する時間の割合。]
バイトレート = 291.8348916277093 []
圧縮率 = 0.0 []
compression-rate-avg = 0.0 [レコード バッチの平均圧縮率]
connection-close-rate = 0.0 [ウィンドウ内で 1 秒あたりに閉じられた接続数]
connection-count = 2.0 [アクティブな接続の現在の数]
connection-creation-rate = 0.05180541884681138 [ウィンドウで 1 秒あたりに確立された新しい接続。]
着信バイト レート = 10.342564641029007 []
io-ratio = 0.0038877559207471236 [I/O スレッドが I/O に費やした時間の割合]
io-time-ns-avg = 353749.2840375587 [選択呼び出しごとの I/O の平均時間 (ナノ秒)]
io-wait-ratio = 0.21531227995769162 [I/O スレッドが待機に費やした時間の割合]
io-wait-time-ns-avg = 1.9591901192488264E7 [I/O スレッドが読み取りまたは書き込みの準備ができたソケットを待機するのに費やした平均時間 (ナノ秒単位)]
metadata-age = 8.096 [現在使用されているプロデューサ メタデータの経過時間 (秒)]
network-io-rate = 5.2937784999213795 [すべての接続での 1 秒あたりのネットワーク操作 (読み取りまたは書き込み) の平均数]
発信バイト レート = 451.2298783403283 []
Produce-throttle-time-avg = 0.0 [ミリ秒単位の平均スロットル時間]
Produce-throttle-time-max = 0.0 [ミリ秒単位の最大スロットル時間]
record-error-rate = 0.0 [エラーになったレコード送信の 1 秒あたりの平均数]
record-queue-time-avg = 15.5 [レコード アキュムレータで費やされたミリ秒単位のレコード バッチの平均時間]
record-queue-time-max = 434.0 [レコード アキュムレータで費やされたミリ秒単位のレコード バッチの最大時間。]
記録再試行率 = 0.0 []
record-send-rate = 2.65611304417116 [1 秒あたりの平均送信レコード数]
record-size-avg = 97.87 [平均レコード サイズ]
record-size-max = 98.0 [最大レコード サイズ]
records-per-request-avg = 1.0 [リクエストあたりの平均レコード数]
request-latency-avg = 0.0 [ミリ秒単位の平均リクエスト レイテンシ]
リクエスト遅延最大 = 74.0 []
request-rate = 2.6468892499606897 [1 秒あたりの平均送信リクエスト数]
request-size-avg = 42.0 [ウィンドウ内のすべてのリクエストの平均サイズ..]
request-size-max = 170.0 [ウィンドウ内で送信されるリクエストの最大サイズ]
requests-in-flight = 0.0 [応答を待っている進行中のリクエストの現在の数]
response-rate = 2.651196976060479 [1 秒あたりの平均応答数]
select-rate = 10.989861465830819 [1 秒あたりに実行する新しい I/O を I/O レイヤーがチェックした回数]
waiting-threads = 0.0 [バッファ メモリがレコードをキューに入れるのを待ってブロックされたユーザー スレッドの数]
ありがとう