6

次のPHPアプリケーションがあります。これにより、ユーザーのサインアップがメッセージ キューにパブリッシュされます。Java アプリケーションはそのキューから読み取り、インポートします。うまくいけば、下の図がそれを説明します。私は Java 側の作業のみを行っています。json メッセージは既にキューに存在します。

ここに画像の説明を入力

ルート (Java 消費側)。

@Component
public class SignUpRouting {

  errorHandler(deadLetterChannel("rabbitmq://signUpDeadLetter.exchange?username=etc..").useOriginalMessage());

  from("rabbitmq://phpSignUp.exchange?username=etc....")
            .routeId("signUpRoute")
            .processRef("signUpProcessor")
            .end();
  //.... 

プロセッサは..

@Component
public class SignupProcessor implements Processor {

    private ObjectMapper mapper = new ObjectMapper();

    @Override
    public void process(Exchange exchange) throws Exception {

        String json = exchange.getIn().getBody(String.class);
        SignUpDto dto = mapper.readValue(json, SignUpDto.class);

        SignUp signUp = new SignUp();
        signUp.setWhatever(dto.getWhatever());
        //etc....

        // save record
        signUpDao.save(signUp);
    }
}

私の質問はこれです.プロセッサがメッセージのインポートに失敗した場合、どうすればよいですか。

たとえば、DAO 例外があったとします。データ フィールドが長すぎるか、インポートの形式が正しくない可能性があります。メッセージを失いたくない。エラーを確認して、インポートを再試行したいと思います。しかし、30 秒ごとにメッセージを再試行し続けたくありません。

別のキューを作成する必要があると考えています.デッドレターキューで、6時間ごとに無期限にメッセージを再試行しますか?..次に、ログを表示してエラーを確認し、修正をアップロードすると、メッセージが再処理されますか?

どうすればそれを実装できますか?それとも私は間違った道を進んでいますか?

EDIT 私はdeadLetterExchangeを設定して、物事が正しい方向に進むかどうかを確認しようとしました...しかし、エラーが発生し、キューを非nullにすることはできません

 rabbitmq://phpSignUp.exchange?username=etc...&deadLetterExchange=signUpDeadLetter.exchange
4

2 に答える 2

2

デッド レター ヘッダーを使用する例を次に示します。

        <from uri="rabbitmq://localhost/youexchange?queue=yourq1&amp;
            exchangeType=topic&amp;
            routingKey=user.reg.*&amp;
            deadLetterExchange=dead.msgs&amp;
            deadLetterExchangeType=topic&amp;
            deadLetterRoutingKey=dead.letters&amp;
            deadLetterQueue=dead.letters&amp;
            autoAck=false&amp;
            autoDelete=false"/>

          <!--We can use onException to make camel to retry, and after that, dead letter queue are the fallback-->
        <onException useOriginalMessage="true">
            <exception>java.lang.Exception</exception>
            <redeliveryPolicy asyncDelayedRedelivery="true" maximumRedeliveries="3" redeliveryDelay="5000"/>
        </onException>

autoAckをオフにしてdeadLetterQueueを設定する必要があります。例外がスローされた場合、メッセージはデッド レター キューに入れられます。onException を使用するために、ラクダがメッセージを配信不能キューにドロップする前の再試行を制御できます。

于 2015-10-23T13:29:55.523 に答える
1

onException を使用して例外をキャッチできます。例外がある場合、メッセージはデッド レター交換にルーティングされます。Spring DSL の例を次に示します。

<onException useOriginalMessage="true">
            <exception>java.sql.SQLException</exception>
            <redeliveryPolicy asyncDelayedRedelivery="true" maximumRedeliveries="1" redeliveryDelay="1000"/>

            <inOnly uri="rabbitmq://localhost/dead.msgs?exchangeType=fanout&amp;
                    autoDelete=false&amp;
                    bridgeEndpoint=true"/>
</onException>
于 2015-10-22T04:13:03.627 に答える