問題タブ [kafka-consumer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
5 に答える
34097 参照

java - Kafka - 高レベルのコンシューマーを使用した遅延キューの実装

高レベルのコンシューマ API を使用して遅延コンシューマを実装したい

本旨:

  • キーによってメッセージを生成します (各メッセージには作成タイムスタンプが含まれます)。
  • auto.commit.enable=false (各メッセージ処理後に明示的にコミットします)
  • メッセージを消費する
  • メッセージのタイムスタンプを確認し、十分な時間が経過したかどうかを確認します
  • メッセージの処理 (この操作は決して失敗しません)
  • コミット 1 オフセット

    /li>

この実装に関するいくつかの懸念:

  1. 各オフセットをコミットすると、ZK が遅くなる可能性があります
  2. consumer.commitOffsets は例外をスローできますか? はいの場合、同じメッセージを2回消費します(べき等メッセージで解決できます)
  3. オフセットをコミットせずに長時間待機する問題。たとえば、遅延期間が 24 時間で、イテレータから次を取得し、24 時間スリープし、処理してコミットします (ZK セッションのタイムアウト ?)
  4. 新しいオフセットをコミットせずに ZK セッションをキープアライブするにはどうすればよいですか? (ハイブzookeeper.session.timeout.msを設定すると、それを認識せずに死んだ消費者で解決できます)
  5. 不足している他の問題はありますか?

ありがとう!

0 投票する
4 に答える
12515 参照

python - プログラムでPython Kafka Consumerを停止するには?

私は Python Kafka コンシューマを実行しています ( http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.htmlで kafka.consumer.SimpleConsumer または kafka.consumer.simple.SimpleConsumer を使用しようとしています)。次のコードを実行すると、すべてのメッセージが消費されたとしても、常に実行されます。すべてのメッセージを消費する場合、消費者が停止することを願っています。どうやってするの?また、stop() 関数 (基本クラス kafka.consumer.base.Consumer にあります) の使用方法もわかりません。

アップデート

シグナル ハンドラを使用して、consumer.stop() を呼び出しました。一部のエラー メッセージが画面に出力されました。しかし、プログラムはまだ for ループでスタックしていました。新しいメッセージが届くと、消費者はそれらを消費して印刷しました。client.close() も試しました。でも同じ結果。

for ループを適切に停止するには、いくつかの方法が必要です。

どんな助けでも大歓迎です。ありがとう。

0 投票する
4 に答える
15801 参照

apache-spark - Spark が消費した最新のオフセットを ZK または Kafka に保存し、再起動後に読み戻す方法

Kafka 0.8.2AdExchange からデータを受信するために使用Spark Streaming 1.4.1してから、データを に保存するために使用していMongoDBます。

私の問題は、Spark Streamingたとえば、新しいバージョンの更新、バグの修正、新しい機能の追加など、ジョブを再起動するときです。その時点で最新offsetのものを引き続き読み取り、ジョブの再起動中に AdX が kafka にプッシュしたデータを失います。kafka

私は何かを試してみますauto.offset.reset -> smallestが、0から受信します->最後に、データが巨大でdbで重複していました。

また、特定のものを設定しようとgroup.idconsumer.idますSparkが、同じです。

offset消費された最新のスパークを保存する方法、zookeeperまたはkafkaそれから最新のものに読み戻す方法はoffset?