1

私は Akka (最新の安定バージョン)akka-camelと JMS (この会話の目的のために、ActiveMQ としましょうが、理想的にはソリューションは汎用的である必要があります) を使用しています。

ユースケース

次のユースケースがあります。キューで次のQようなメッセージを受け取ります。

time:   1     2    3    4    5    6

      | A1 | B1 | C1 | C2 | A2 | B2 | .... 
         ^                        ^
       first                     latest

(A1,A2)私の最終目標は、 、 、などでそれらをペアにすること(B1,B2)です。重複や未配信メッセージなどの複雑な問題にもかかわらず、ペア全体が一致して処理されるまで、ブローカーがすべての未確認メッセージを保持し続けるようにしなければならないという複雑な問題があります。

44 つのメッセージを受信して​​処理し、ペア を正常に処理しましたが、とはまだ一致せず保留中であり、JMS ではまでのすべてのメッセージを確認(C1, C2)することを意味するため、まだ何も確認できません。実際のところ、私が送り返すことができる最初の承認は、 が受信されたときです。この時点で、承認できます(保留中の のみ)。A1B1C2C25A2A1A1B1

問題

さて、私が理解できないように見えるのは、この種の遅延および非同期の確認応答をakka-camel. 私はオンラインで読んでいますが、メッセージを手動で確認する方法についての説明 (ドキュメント) を見つけることができますが、以前に処理されたメッセージをブローカーに確認する方法を示すものは何もありません。

import akka.camel.{ CamelMessage, Consumer }
import akka.camel.Ack
import akka.actor.Status.Failure

class Consumer3 extends Consumer {
  override def autoAck = false

  def endpointUri = "jms:queue:test"

  def receive = {
    case msg: CamelMessage =>
      sender() ! Ack
      // on success
      // ..
      val someException = new Exception("e1")
      // on failure
      sender() ! Failure(someException)
  }
}

この場合、Ackは でありobject、そのセマンティックは本当に単純です:現在のメッセージ確認しますが、Xは以前のメッセージですが、必ずしも現在のメッセージであるとは限りません。

このユースケースはサポートされていますか、それともサポート可能akka-camelですか、それとも自分で構築する必要がありますか?

ありがとう

4

1 に答える 1