2

spring amqp を使用して rabbitmq を使用しようとしています。以下は私の構成です。

<rabbit:connection-factory id="rabbitConnectionFactory"
    port="${rabbitmq.port}" host="${rabbitmq.host}" />

<rabbit:admin connection-factory="rabbitConnectionFactory" />

<rabbit:queue name="${rabbitmq.import.queue}" />

<rabbit:template id="importAmqpTemplate"
    connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}"  routing-key="${rabbitmq.import.queue}"/>

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="5">
    <rabbit:listener queues="${rabbitmq.import.queue}" ref="importMessageListener"  method="onMessage" />
</rabbit:listener-container>

これは単純な Message Listener クラスです。

public class ImportMessageListener {

    @Override
    public void onMessage(Message message) {
        System.out.println("consumer output: " + message);
        return message;
    }

}

こちらはプロデューサー(春バッチのitemWriter)、

public class ImportItemWriter<T> implements ItemWriter<T> {

    private AmqpTemplate template;

    public AmqpTemplate getTemplate() {
        return template;
    }

    public void setTemplate(AmqpTemplate template) {
        this.template = template;
    }

    public void write(List<? extends T> items) throws Exception {
        for (T item : items) {
            Object reply = template.convertSendAndReceive(item.toString());
            System.out.println("producer output: " + reply);
        }
    }

}

春のバッチ ジョブを実行すると、各メッセージが 1 つずつ送信されて処理され、応答を下回ります。

consumer output: 1
producer output: 1

consumer output: 2
producer output: 2

consumer output: 3
producer output: 3

consumer output: 4
producer output: 4


consumer output: 5
producer output: 5

5 つのメッセージを送信してキューに入れ、5 つのコンシューマー スレッド (同時実行数 = 5) がそれらを同時に処理し、完了するとすぐに応答する必要があります。

So below should be the outout

consumer output: 1
consumer output: 2
consumer output: 3
consumer output: 4
consumer output: 5

producer output: 1
producer output: 2
producer output: 3
producer output: 4
producer output: 5

プロデューサーが最初のメッセージの返信を待って2番目のメッセージをキューに入れたくありません。

非同期にする convertAndSend を使用してみましたが (返信を待機しません)、 convertSendAndReceive で取得できるように、itemWriter で返信メッセージを取得するにはどうすればよいですか?

テンプレート構成を次のように変更すると、

<rabbit:template id="importAmqpTemplate"
        connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}"
        routing-key="${rabbitmq.import.queue}" reply-queue="${rabbitmq.import.reply.queue}">
        <rabbit:reply-listener/>
    </rabbit:template>

そして、 template.convertAndSend(item.toString()); を使用する場合 では、どうすれば返信メッセージを取得できますか?

このリスナーに独自のメッセージ ハンドラーをアタッチして、コンシューマー側でアタッチできる方法で応答メッセージを取得することはできません。返信には、デフォルトの RabbitmqTemplate ハンドラが必要です。

4

1 に答える 1