バックグラウンド
RabbitMQ とやり取りするためにlangohrを使用しています。サービスによってまだ適切に処理されていないメッセージを RabbitMQ が再送信できるようにするために、2 つの異なるアプローチを試みました。動作する 1 つの方法は、basic.nack
withrequeue
セットを に送信true
することですが、サービスがbasic.ack
. たとえば、サービスが現在ダウンしている (そしてしばらくダウンしている) データストアにメッセージを永続化しようとすると、これは少し問題になります。未配信のメッセージを 20 秒ごとに取得する方がよいでしょう (つまりbasic.ack
、basic.nack
データストアがダウンしている場合はメッセージをキューに保持するだけです)。私たちはExecutorService
、要点が次のように実装されている を使用してこれを実装しようとしました:
(let [chan (lch/open conn)] ; We create a new channel since channels in Langohr are not thread-safe
(log/info "Triggering \"recover\" for channel" chan)
(try
(lb/recover chan)
(catch Exception e (log/error "Failed to call recover" e))
(finally (lch/close chan))))
残念ながら、これは機能していないようです (メッセージは再配信されず、キューに残るだけです)。サービスを再起動すると、キューに入れられたメッセージが正しく消費されます。ただし、 spring-rabbitmq (Java) を使用して実装されている他のサービスがあり、それらはそのままでこれを処理しているようです。ソースコードを調べて、彼らがどのようにそれを行うのかを調べてみましたが、まだうまくいきません.
質問
RabbitMQ に定期的に (できれば Langohr を使用して) キュー内のメッセージを (再) 配信するように指示するにはどうすればよいですか?