私は quarkus バージョンを使用しています:1.5.2.Final
およびリアクティブ メッセージングを含めるために次の依存関係を使用しています (送信側プロジェクトと受信側プロジェクトの間に Artemis の実行中の docker インスタンスがあります):
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-amqp</artifactId>
</dependency>
送信者プロジェクトには、@Outgoing('stock-quote')
毎秒乱数を生成して返すメソッドに関する注釈がありますFlowable<Message<String>>
。構成は次のとおりです。
mp.messaging.outgoing.stock-quote.connector=smallrye-amqp
mp.messaging.outgoing.stock-quote.address=stocks
mp.messaging.outgoing.stock-quote.durable=true
受信者プロジェクトでは、注釈付きのメソッドを持つ受信者が配置されてい@Incoming('stock)'.
ます。構成は次のとおりです。
mp.messaging.incoming.stocks.connector=smallrye-amqp
mp.messaging.incoming.stocks.durable=true
私は何に気づきますか?すべてが正常に機能します。なんらかの理由で受信者プロジェクトが停止した場合でも、送信者によってまだ送信されているメッセージは保持されます。これは良いことです。ただし、レシーバーが再びオンラインに戻ると、レシーバーは永続化されたメッセージを受信せず、代わりに別の「クライアント スレッド」で開始します。永続化されたメッセージはそこにとどまります。
@Broadcast
送信者側にも注釈を追加しようとしました。また、レシーバーの複数のインスタンスが必要なため、メッセージはレシーバーに分割されています。その後、レシーバーがオフラインになると、メッセージは再び永続化されます。レシーバーが復帰すると、新しいメッセージが送信されますが、持続メッセージはそこにとどまります (別の「クライアント スレッド」に留まり、さらに悪いことに、この「クライアント スレッド」の持続メッセージは (ブロードキャストのために) 蓄積されます)。
それで、私は何が欲しいですか?メッセージを送信する送信者が必要です。これらのメッセージは、2 つ以上の受信側インスタンスに分割されています。1 つのレシーバーがダウンした場合、メッセージは他のインスタンスによって処理されます。すべてのレシーバーがダウンしている場合、メッセージは永続化され、1 つ以上のレシーバーが再びオンラインになると、これらの永続化されたメッセージが処理されます。だから基本的に私はこれが欲しい:
アルテミスと組み合わせて quarkus リアクティブ メッセージを使用してこれを行う方法について、誰にもアイデアがありますか?