0

バックグラウンド

RabbitMQ とやり取りするためにlangohrを使用しています。サービスによってまだ適切に処理されていないメッセージを RabbitMQ が再送信できるようにするために、2 つの異なるアプローチを試みました。動作する 1 つの方法は、basic.nackwithrequeueセットを に送信trueすることですが、サービスがbasic.ack. たとえば、サービスが現在ダウンしている (そしてしばらくダウンしている) データストアにメッセージを永続化しようとすると、これは少し問題になります。未配信のメッセージを 20 秒ごとに取得する方がよいでしょう (つまりbasic.ackbasic.nackデータストアがダウンしている場合はメッセージをキューに保持するだけです)。私たちはExecutorService、要点が次のように実装されている を使用してこれを実装しようとしました:

(let [chan (lch/open conn)]  ; We create a new channel since channels in Langohr are not thread-safe
    (log/info "Triggering \"recover\" for channel" chan)
    (try
      (lb/recover chan)
      (catch Exception e (log/error "Failed to call recover" e))
      (finally (lch/close chan))))

残念ながら、これは機能していないようです (メッセージは再配信されず、キューに残るだけです)。サービスを再起動すると、キューに入れられたメッセージが正しく消費されます。ただし、 spring-rabbitmq (Java) を使用して実装されている他のサービスがあり、それらはそのままでこれを処理しているようです。ソースコードを調べて、彼らがどのようにそれを行うのかを調べてみましたが、まだうまくいきません.

質問

RabbitMQ に定期的に (できれば Langohr を使用して) キュー内のメッセージを (再) 配信するように指示するにはどうすればよいですか?

4

2 に答える 2

2

Spring AMQP アプリで何をしているのかわかりませんが、RabbitMQ には何も組み込まれていません。

ただし、TTL を使用して配信不能を設定し、一定期間後に元のキューに再キューイングするのは非常に簡単です。例、リンクなどについては、この回答を参照してください。

編集

ただし、Spring AMQPには再試行中に一定期間コンシューマー スレッドを一時停止するように構成できる再試行インターセプターがあります

ステートフルな再試行の拒否と再キューイング。ステートレス再試行では、再試行が内部で処理され、再試行中にブローカーと対話することはありません。

于 2016-01-03T16:53:49.230 に答える
1

指示のあるこの回答を参照してください。メッセージをNACKし、NACKはメッセージをN秒間保留キューに入れ、次にTTLをそのキューから出して別のキューに入れ、元のキューに戻します。

設定に少し手間がかかりましたが、うまくいきました!

于 2016-01-03T19:08:00.190 に答える