私は Akka (最新の安定バージョン)akka-camel
と JMS (この会話の目的のために、ActiveMQ としましょうが、理想的にはソリューションは汎用的である必要があります) を使用しています。
ユースケース
次のユースケースがあります。キューで次のQ
ようなメッセージを受け取ります。
time: 1 2 3 4 5 6
| A1 | B1 | C1 | C2 | A2 | B2 | ....
^ ^
first latest
(A1,A2)
私の最終目標は、 、 、などでそれらをペアにすること(B1,B2)
です。重複や未配信メッセージなどの複雑な問題にもかかわらず、ペア全体が一致して処理されるまで、ブローカーがすべての未確認メッセージを保持し続けるようにしなければならないという複雑な問題があります。
例
で4
4 つのメッセージを受信して処理し、ペア を正常に処理しましたが、とはまだ一致せず保留中であり、JMS ではまでのすべてのメッセージを確認(C1, C2)
することを意味するため、まだ何も確認できません。実際のところ、私が送り返すことができる最初の承認は、 が受信されたときです。この時点で、承認できます(保留中の のみ)。A1
B1
C2
C2
5
A2
A1
A1
B1
問題
さて、私が理解できないように見えるのは、この種の遅延および非同期の確認応答をakka-camel
. 私はオンラインで読んでいますが、メッセージを手動で確認する方法についての説明 (ドキュメントと例) を見つけることができますが、以前に処理されたメッセージをブローカーに確認する方法を示すものは何もありません。
import akka.camel.{ CamelMessage, Consumer }
import akka.camel.Ack
import akka.actor.Status.Failure
class Consumer3 extends Consumer {
override def autoAck = false
def endpointUri = "jms:queue:test"
def receive = {
case msg: CamelMessage =>
sender() ! Ack
// on success
// ..
val someException = new Exception("e1")
// on failure
sender() ! Failure(someException)
}
}
この場合、Ack
は でありobject
、そのセマンティックは本当に単純です:現在のメッセージを確認しますが、Xは以前のメッセージですが、必ずしも現在のメッセージであるとは限りません。
このユースケースはサポートされていますか、それともサポート可能akka-camel
ですか、それとも自分で構築する必要がありますか?
ありがとう