問題タブ [kafka-producer-api]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - Kafka でブローカーを処理する
私は非同期モードでkafkaプロデューサーを使用していますが、すべてのブローカーがダウンすると同期のように動作し、metadata.fetch.timeout.msが期限切れになるまで待機します。これは私の場合は60秒です。私の最初の質問は、これは正常な動作ですか、それとも何か間違ったことをしているのですか?
私のロジックのトランザクションは最大 100 ミリ秒で終了する必要があるため、このタイムアウト値は私にとって非常に大きな遅延です。おそらく、metadata.fetch.timeout.ms を 10 ミリ秒に設定すると問題が解決する可能性がありますが、これがシステムにどのように影響するかはわかりません。これはどこかでボトルネックや CPU の大量消費を引き起こしますか?
別の可能な解決策は、作成を本当に非同期にするexecutorserviceでメッセージを作成することですが、物事をより複雑にしたくありません。誰もこれを前に試しましたか?
私の最後の質問は、スイッチメカニズムを使用して、すべてのブローカーがダウンしている場合は kafka へのプロデュースを無効にし、すべてのブローカーが稼働している場合は有効にすることです。カフカにハートビートの問題に対する機能はありますか?
ありがとう。
apache-kafka - Kafka 0.9プロデューサーで最大キュー時間を設定するには?
Kafka v0.9 で最大キュー時間を設定するには?
私の理解では、Kafka プロデューサーはキュー内のメッセージを特定のサイズにバッファリングしてから、キュー内のメッセージをバッチとしてブローカーに送信しようとします。ただし、Kafka v0.8 では、"queue.buffering.max.ms" の構成により、メッセージがバッファー サイズよりも小さい場合でも、制限時間に達するとキュー内のメッセージが送信されます。
バッチ処理は、固定された数のメッセージを蓄積しないように構成し、固定された遅延限界 (64k または 10 ミリ秒など) を超えないように構成できます。
Kafka 0.9 ドキュメントから: http://kafka.apache.org/documentation.html#design_asyncsend。(このセクションは Kafka 0.8.2 と同じです)
しかし、v0.9プロデューサー構成セクションで「queue.buffering.max.ms」または同等の構成が見つかりませんでしたhttp://kafka.apache.org/documentation.html#producerconfigs
Kafka v0.9でこれを構成する方法はまだありますか、KafkaProducer.close()
それとも電話KafkaProducer.flush()
する必要がありますか?
java - カフカプロデューサーAPIエラースレッドメインkafka.common.FailedToSendMessageExceptionの例外
すべてのプロデューサー API プログラムでこのエラーが発生します。解決しようとしましたが、何が問題なのかを見つけることができません。
親切に私を助けてください。
前もって感謝します。
apache-kafka - 1 つのパーティションに複数のトピックがありますか?
私はただ興味があり、これに関する情報を見つけることができませんでした。私の質問は、単一のパーティションに複数のトピックを含めることができますか? はいの場合、そのパーティションでどのように生成されるか、または後で消費者によって消費されますか? それとも、1 つのパーティションが常に 1 つのトピックを保持するということですか?
scala - カフカProducerRecordとKeyedMessageの違いは何ですか?
kafka プロデューサー プロデューサーのパフォーマンスを測定しています。現在、構成と使用方法が少し異なる2つのクライアントに会いました。
一般:
最初のクライアント:
使用法:
2 番目のクライアント:
使用法:
私の質問は次のとおりです。
- 2 つのクライアントの違いは何ですか?
- 大規模なアプリケーションで最適な高負荷の書き込みパフォーマンスを実現するには、どのプロパティを考慮して構成する必要がありますか?
java - Kafka プロデューサーは、Java からトピックのパーティションを作成します
私は kafka プロデューサーを作成しており、メッセージ タイプに従ってメッセージを送信する際に partitionId も指定したいと考えています。Mysql テーブルに type-partitonId マッピングがあります。新しいメッセージ タイプの場合、新しいパーティションを作成し、その ID を Mysql に保存して、次に同じタイプが来たときにその ID にメッセージを直接送信できるようにする必要があります。トピックの新しいパーティションを作成し、作成された partitionId を取得するにはどうすればよいですか? 最新のカフカ API .9 と Java 1.8 を使用しています。
hadoop - 時間に基づくバケット レコード (kafka-hdfs-connector)
Confluent プラットフォームが提供する kafka-hdfs-connector を使用して、Kafka から Hive テーブルにデータをコピーしようとしています。私はそれをうまく行うことができましたが、時間間隔に基づいて着信データをバケット化する方法を考えていました。たとえば、5 分ごとに新しいパーティションを作成したいと考えています。
partition.duration.msでio.confluent.connect.hdfs.partitioner.TimeBasedPartitionerを試しましたが、間違った方法でやっていると思います。Hive テーブルには、すべてのデータがその特定のパーティションに入る 1 つのパーティションしか表示されません。このようなもの :
そして、すべての avro オブジェクトがこのパーティションにコピーされます。
代わりに、次のようなものが欲しいです:
最初に、コネクタはパスyear=2016/month=03/day=15/hour=19/minute=03を作成し、次の 5 分間、すべての受信データをこのディレクトリにコピーし続け、6 分の開始時に新しいパス、つまりyear=2016/month=03/day=15/hour=19/minute=08を作成し、次の 5 分間のデータをこのディレクトリにコピーする必要があります。
これは私の設定ファイルがどのように見えるかです:
誰かが私を正しい方向に向けることができれば、それは本当に役に立ちます。必要に応じて、詳細を共有していただければ幸いです。この質問を終わりのないもののように見せたくない.
どうもありがとう!