2

RabbitMQ からのメッセージの処理中に例外が発生した場合、特定のメッセージを確認解除して別のキューに戻すか、同じキューに再キューイングするか、メッセージを完全に破棄したかっただけです (basicNack の最後の Boolean flag@requeue に従って)。

全体のアイデアは後で、未確認メッセージの数を取得し、同じチャネルに何度も再キューイングする代わりにメッセージ形式などを確認でき、未確認の信号を現在のチャネルに送信したい.

参考までに、チャネル ack モードを手動に設定しました (つまり、container.setAcknowledgeMode(AcknowledgeMode.MANUAL);)

これが私が今していることです。

public class My***Listener implements ChannelAwareMessageListener{

try{

    @Override
    public void onMessage(Message message,Channel channel) throws Exception {   
    String s = new String(message.getBody());
    //some logic
    //after successful ack manually
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
catch(Exception e){
      //currently on exception i am unack the channel
      channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}

どんな助けも非常に高く評価されます。

4

3 に答える 3

3

次のようなものが必要です。

@Bean
RetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateless()
            .withMaxAttempts(5)
            .setRecoverer(new RepublishMessageRecoverer(amqpTemplate(), "bar", "baz"))
            .build();
}

注意: でのみ機能します。リファレンスspring-amqp 1.3+ も参照してください。

于 2014-08-18T08:57:10.357 に答える
1

宣言を使用する場合は、この回答の例を参照してください。

メッセージを にルーティングするには、 (リスナー コンテナ上で) に設定DLXできます。または、 をスローして、このメッセージを拒否する (再度キューに入れないようにする) ことをコンテナーに伝えることができます。defaultRequeuRejectedfalseAmqpRejectAndDontRequeueException

コンテナのデフォルトの動作は、拒否されたメッセージを再度キューに入れることです。

で再試行インターセプターを使用してRejectAndDontRequeueRecoverer、例外を自動的にスローできます。または、@ Jawo99 が言うように、再公開リカバリを使用できます。これには、スタック トレースをヘッダーとして追加するという追加の利点があります。DLX は元のメッセージをルーティングするだけです。

于 2014-08-18T12:41:42.630 に答える