高レベルのコンシューマ API を使用して遅延コンシューマを実装したい
本旨:
- キーによってメッセージを生成します (各メッセージには作成タイムスタンプが含まれます)。
- auto.commit.enable=false (各メッセージ処理後に明示的にコミットします)
- メッセージを消費する
- メッセージのタイムスタンプを確認し、十分な時間が経過したかどうかを確認します
- メッセージの処理 (この操作は決して失敗しません)
コミット 1 オフセット
while (it.hasNext()) { val msg = it.next().message() //checks timestamp in msg to see delay period exceeded while (!delayedPeriodPassed(msg)) { waitSomeTime() //Thread.sleep or something.... } //certain that the msg was delayed and can now be handled Try { process(msg) } //the msg process will never fail the consumer consumer.commitOffsets //commit each msg }
この実装に関するいくつかの懸念:
- 各オフセットをコミットすると、ZK が遅くなる可能性があります
- consumer.commitOffsets は例外をスローできますか? はいの場合、同じメッセージを2回消費します(べき等メッセージで解決できます)
- オフセットをコミットせずに長時間待機する問題。たとえば、遅延期間が 24 時間で、イテレータから次を取得し、24 時間スリープし、処理してコミットします (ZK セッションのタイムアウト ?)
- 新しいオフセットをコミットせずに ZK セッションをキープアライブするにはどうすればよいですか? (ハイブzookeeper.session.timeout.msを設定すると、それを認識せずに死んだ消費者で解決できます)
- 不足している他の問題はありますか?
ありがとう!