問題タブ [kafka-consumer-api]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - Kafka - 高レベルのコンシューマーを使用した遅延キューの実装
高レベルのコンシューマ API を使用して遅延コンシューマを実装したい
本旨:
- キーによってメッセージを生成します (各メッセージには作成タイムスタンプが含まれます)。
- auto.commit.enable=false (各メッセージ処理後に明示的にコミットします)
- メッセージを消費する
- メッセージのタイムスタンプを確認し、十分な時間が経過したかどうかを確認します
- メッセージの処理 (この操作は決して失敗しません)
コミット 1 オフセット
/li>
この実装に関するいくつかの懸念:
- 各オフセットをコミットすると、ZK が遅くなる可能性があります
- consumer.commitOffsets は例外をスローできますか? はいの場合、同じメッセージを2回消費します(べき等メッセージで解決できます)
- オフセットをコミットせずに長時間待機する問題。たとえば、遅延期間が 24 時間で、イテレータから次を取得し、24 時間スリープし、処理してコミットします (ZK セッションのタイムアウト ?)
- 新しいオフセットをコミットせずに ZK セッションをキープアライブするにはどうすればよいですか? (ハイブzookeeper.session.timeout.msを設定すると、それを認識せずに死んだ消費者で解決できます)
- 不足している他の問題はありますか?
ありがとう!
python - プログラムでPython Kafka Consumerを停止するには?
私は Python Kafka コンシューマを実行しています ( http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.htmlで kafka.consumer.SimpleConsumer または kafka.consumer.simple.SimpleConsumer を使用しようとしています)。次のコードを実行すると、すべてのメッセージが消費されたとしても、常に実行されます。すべてのメッセージを消費する場合、消費者が停止することを願っています。どうやってするの?また、stop() 関数 (基本クラス kafka.consumer.base.Consumer にあります) の使用方法もわかりません。
アップデート
シグナル ハンドラを使用して、consumer.stop() を呼び出しました。一部のエラー メッセージが画面に出力されました。しかし、プログラムはまだ for ループでスタックしていました。新しいメッセージが届くと、消費者はそれらを消費して印刷しました。client.close() も試しました。でも同じ結果。
for ループを適切に停止するには、いくつかの方法が必要です。
どんな助けでも大歓迎です。ありがとう。
apache-spark - Spark が消費した最新のオフセットを ZK または Kafka に保存し、再起動後に読み戻す方法
Kafka 0.8.2
AdExchange からデータを受信するために使用Spark Streaming 1.4.1
してから、データを に保存するために使用していMongoDB
ます。
私の問題は、Spark Streaming
たとえば、新しいバージョンの更新、バグの修正、新しい機能の追加など、ジョブを再起動するときです。その時点で最新offset
のものを引き続き読み取り、ジョブの再起動中に AdX が kafka にプッシュしたデータを失います。kafka
私は何かを試してみますauto.offset.reset -> smallest
が、0から受信します->最後に、データが巨大でdbで重複していました。
また、特定のものを設定しようとgroup.id
しconsumer.id
ますSpark
が、同じです。
offset
消費された最新のスパークを保存する方法、zookeeper
またはkafka
それから最新のものに読み戻す方法はoffset
?