28

高レベルのコンシューマ API を使用して遅延コンシューマを実装したい

本旨:

  • キーによってメッセージを生成します (各メッセージには作成タイムスタンプが含まれます)。
  • auto.commit.enable=false (各メッセージ処理後に明示的にコミットします)
  • メッセージを消費する
  • メッセージのタイムスタンプを確認し、十分な時間が経過したかどうかを確認します
  • メッセージの処理 (この操作は決して失敗しません)
  • コミット 1 オフセット

    while (it.hasNext()) {
      val msg = it.next().message()
      //checks timestamp in msg to see delay period exceeded
      while (!delayedPeriodPassed(msg)) { 
         waitSomeTime() //Thread.sleep or something....
      }
      //certain that the msg was delayed and can now be handled
      Try { process(msg) } //the msg process will never fail the consumer
      consumer.commitOffsets //commit each msg
    }
    

この実装に関するいくつかの懸念:

  1. 各オフセットをコミットすると、ZK が遅くなる可能性があります
  2. consumer.commitOffsets は例外をスローできますか? はいの場合、同じメッセージを2回消費します(べき等メッセージで解決できます)
  3. オフセットをコミットせずに長時間待機する問題。たとえば、遅延期間が 24 時間で、イテレータから次を取得し、24 時間スリープし、処理してコミットします (ZK セッションのタイムアウト ?)
  4. 新しいオフセットをコミットせずに ZK セッションをキープアライブするにはどうすればよいですか? (ハイブzookeeper.session.timeout.msを設定すると、それを認識せずに死んだ消費者で解決できます)
  5. 不足している他の問題はありますか?

ありがとう!

4

5 に答える 5

24

これを行う 1 つの方法は、遅延するすべてのメッセージをプッシュする別のトピックを使用することです。すべての遅延メッセージを同じ時間遅延後に処理する必要がある場合、これはかなり簡単です。

while(it.hasNext()) {
    val message = it.next().message()
    
    if(shouldBeDelayed(message)) {
        val delay = 24 hours
        val delayTo = getCurrentTime() + delay
        putMessageOnDelayedQueue(message, delay, delayTo)
    }
    else {
       process(message)
    }

    consumer.commitOffset()
}

すべての通常のメッセージはできるだけ早く処理されるようになり、遅延が必要なメッセージは別のトピックに置かれます。

良いことは、delayTo 値が最小になるため、遅延トピックの先頭にあるメッセージが最初に処理されるべきメッセージであることがわかっていることです。したがって、ヘッドメッセージを読み取り、タイムスタンプが過去のものかどうかをチェックし、そうであればメッセージを処理してオフセットをコミットする別のコンシューマを設定できます。そうでない場合は、オフセットをコミットせず、代わりにその時までスリープします。

while(it.hasNext()) {
    val delayedMessage = it.peek().message()
    if(delayedMessage.delayTo < getCurrentTime()) {
        val readMessage = it.next().message
        process(readMessage.originalMessage)
        consumer.commitOffset()
    } else {
        delayProcessingUntil(delayedMessage.delayTo)
    }
}

異なる遅延時間がある場合は、遅延に関するトピックを分割できます (例: 24 時間、12 時間、6 時間)。遅延時間がそれよりも動的である場合は、もう少し複雑になります。2 つの遅延トピックを導入することで解決できます。遅延トピックからすべてのメッセージを読み取り、値が過去のAすべてのメッセージを処理します。delayTo他のものの中から、最も近いものを見つけて、delayToそれらをトピックに入れますB。最も近いものを処理する必要があるまでスリープし、すべてを逆に実行します。つまり、トピックからのメッセージを処理し、Bまだ処理してはならないメッセージをトピックに戻しますA

特定の質問に答えるには(質問へのコメントで対処されているものもあります)

  1. 各オフセットをコミットすると、ZK が遅くなる可能性があります

オフセットを Kafka に保存するように切り替えることを検討できます (0.8.2 から利用可能な機能offsets.storage、コンシューマー構成のプロパティを確認してください)。

  1. consumer.commitOffsets は例外をスローできますか? はいの場合、同じメッセージを2回消費します(べき等メッセージで解決できます)

たとえば、オフセットストレージと通信できない場合は、できると思います。あなたが言うように、べき等メッセージを使用すると、この問題は解決します。

  1. オフセットをコミットせずに長時間待機する問題。たとえば、遅延期間が 24 時間で、イテレータから次を取得し、24 時間スリープし、処理してコミットします (ZK セッションのタイムアウト?)

これは、メッセージ自体の処理にセッション タイムアウト以上の時間がかからない限り、上記のソリューションでは問題になりません。

  1. 新しいオフセットをコミットせずに ZK セッションをキープアライブするにはどうすればよいですか? (ハイブzookeeper.session.timeout.msを設定すると、それを認識せずに死んだ消費者で解決できます)

上記の場合でも、長いセッション タイムアウトを設定する必要はありません。

  1. 私が見逃している他の問題はありますか?

常にあります;)

于 2015-08-20T09:23:26.060 に答える
3

あなたの場合は別のルートをお勧めします。

コンシューマーのメインスレッドで待機時間に対処することは意味がありません。これは、キューの使用方法におけるアンチパターンになります。概念的には、メッセージをできるだけ速く処理し、キューの負荷率を低く保つ必要があります。

代わりに、遅延が必要なメッセージごとにジョブをスケジュールするスケジューラを使用します。このようにして、キューを処理し、定義済みの時点でトリガーされる非同期ジョブを作成できます。

この手法を使用することの欠点は、スケジュールされたジョブをメモリに保持する JVM の状態に敏感であることです。その JVM に障害が発生すると、スケジュールされたジョブが失われ、タスクが実行されたか実行されなかったかがわかりません。

スケジューラーの実装がありますが、クラスター環境で実行するように構成できるため、JVM のクラッシュから安全に保つことができます。

この Java スケジューリング フレームワークを見てみましょう: http://www.quartz-scheduler.org/

于 2016-09-23T09:52:21.840 に答える