問題タブ [kafka-producer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
1581 参照

apache-kafka - Kafka でブローカーを処理する

私は非同期モードでkafkaプロデューサーを使用していますが、すべてのブローカーがダウンすると同期のように動作し、metadata.fetch.timeout.msが期限切れになるまで待機します。これは私の場合は60秒です。私の最初の質問は、これは正常な動作ですか、それとも何か間違ったことをしているのですか?

私のロジックのトランザクションは最大 100 ミリ秒で終了する必要があるため、このタイムアウト値は私にとって非常に大きな遅延です。おそらく、metadata.fetch.timeout.ms を 10 ミリ秒に設定すると問題が解決する可能性がありますが、これがシステムにどのように影響するかはわかりません。これはどこかでボトルネックや CPU の大量消費を引き起こしますか?

別の可能な解決策は、作成を本当に非同期にするexecutorserviceでメッセージを作成することですが、物事をより複雑にしたくありません。誰もこれを前に試しましたか?

私の最後の質問は、スイッチメカニズムを使用して、すべてのブローカーがダウンしている場合は kafka へのプロデュースを無効にし、すべてのブローカーが稼働している場合は有効にすることです。カフカにハートビートの問題に対する機能はありますか?

ありがとう。

0 投票する
2 に答える
2575 参照

apache-kafka - Kafka 0.9プロデューサーで最大キュー時間を設定するには?

Kafka v0.9 で最大キュー時間を設定するには?

私の理解では、Kafka プロデューサーはキュー内のメッセージを特定のサイズにバッファリングしてから、キュー内のメッセージをバッチとしてブローカーに送信しようとします。ただし、Kafka v0.8 では、"queue.buffering.max.ms" の構成により、メッセージがバッファー サイズよりも小さい場合でも、制限時間に達するとキュー内のメッセージが送信されます。

バッチ処理は、固定された数のメッセージを蓄積しないように構成し、固定された遅延限界 (64k または 10 ミリ秒など) を超えないように構成できます。

Kafka 0.9 ドキュメントから: http://kafka.apache.org/documentation.html#design_asyncsend。(このセクションは Kafka 0.8.2 と同じです)

しかし、v0.9プロデューサー構成セクションで「queue.buffering.max.ms」または同等の構成が見つかりませんでしたhttp://kafka.apache.org/documentation.html#producerconfigs

Kafka v0.9でこれを構成する方法はまだありますか、KafkaProducer.close()それとも電話KafkaProducer.flush()する必要がありますか?

0 投票する
0 に答える
644 参照

java - カフカプロデューサーAPIエラースレッドメインkafka.common.FailedToSendMessageExceptionの例外

すべてのプロデューサー API プログラムでこのエラーが発生します。解決しようとしましたが、何が問題なのかを見つけることができません。

親切に私を助けてください。

前もって感謝します。

0 投票する
2 に答える
1253 参照

apache-kafka - 1 つのパーティションに複数のトピックがありますか?

私はただ興味があり、これに関する情報を見つけることができませんでした。私の質問は、単一のパーティションに複数のトピックを含めることができますか? はいの場合、そのパーティションでどのように生成されるか、または後で消費者によって消費されますか? それとも、1 つのパーティションが常に 1 つのトピックを保持するということですか?

0 投票する
1 に答える
5534 参照

scala - カフカProducerRecordとKeyedMessageの違いは何ですか?

kafka プロデューサー プロデューサーのパフォーマンスを測定しています。現在、構成と使用方法が少し異なる2つのクライアントに会いました。

一般:

最初のクライアント:

使用法:

2 番目のクライアント:

使用法:

私の質問は次のとおりです。

  • 2 つのクライアントの違いは何ですか?
  • 大規模なアプリケーションで最適な高負荷の書き込みパフォーマンスを実現するには、どのプロパティを考慮して構成する必要がありますか?
0 投票する
1 に答える
1064 参照

java - Kafka プロデューサーは、Java からトピックのパーティションを作成します

私は kafka プロデューサーを作成しており、メッセージ タイプに従ってメッセージを送信する際に partitionId も指定したいと考えています。Mysql テーブルに type-partitonId マッピングがあります。新しいメッセージ タイプの場合、新しいパーティションを作成し、その ID を Mysql に保存して、次に同じタイプが来たときにその ID にメッセージを直接送信できるようにする必要があります。トピックの新しいパーティションを作成し、作成された partitionId を取得するにはどうすればよいですか? 最新のカフカ API .9 と Java 1.8 を使用しています。

0 投票する
1 に答える
1813 参照

hadoop - 時間に基づくバケット レコード (kafka-hdfs-connector)

Confluent プラットフォームが提供する kafka-hdfs-connector を使用して、Kafka から Hive テーブルにデータをコピーしようとしています。私はそれをうまく行うことができましたが、時間間隔に基づいて着信データをバケット化する方法を考えていました。たとえば、5 分ごとに新しいパーティションを作成したいと考えています。

partition.duration.msでio.confluent.connect.hdfs.partitioner.TimeBasedPartitionerを試しましたが、間違った方法でやっていると思います。Hive テーブルには、すべてのデータがその特定のパーティションに入る 1 つのパーティションしか表示されません。このようなもの :

そして、すべての avro オブジェクトがこのパーティションにコピーされます。

代わりに、次のようなものが欲しいです:

最初に、コネクタはパスyear=2016/month=03/day=15/hour=19/minute=03を作成し、次の 5 分間、すべての受信データをこのディレクトリにコピーし続け、6 分の開始時に新しいパス、つまりyear=2016/month=03/day=15/hour=19/minute=08を作成し、次の 5 分間のデータをこのディレクトリにコピーする必要があります。

これは私の設定ファイルがどのように見えるかです:

誰かが私を正しい方向に向けることができれば、それは本当に役に立ちます。必要に応じて、詳細を共有していただければ幸いです。この質問を終わりのないもののように見せたくない.

どうもありがとう!