spring amqp を使用して rabbitmq を使用しようとしています。以下は私の構成です。
<rabbit:connection-factory id="rabbitConnectionFactory"
port="${rabbitmq.port}" host="${rabbitmq.host}" />
<rabbit:admin connection-factory="rabbitConnectionFactory" />
<rabbit:queue name="${rabbitmq.import.queue}" />
<rabbit:template id="importAmqpTemplate"
connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}" routing-key="${rabbitmq.import.queue}"/>
<rabbit:listener-container
connection-factory="rabbitConnectionFactory" concurrency="5">
<rabbit:listener queues="${rabbitmq.import.queue}" ref="importMessageListener" method="onMessage" />
</rabbit:listener-container>
これは単純な Message Listener クラスです。
public class ImportMessageListener {
@Override
public void onMessage(Message message) {
System.out.println("consumer output: " + message);
return message;
}
}
こちらはプロデューサー(春バッチのitemWriter)、
public class ImportItemWriter<T> implements ItemWriter<T> {
private AmqpTemplate template;
public AmqpTemplate getTemplate() {
return template;
}
public void setTemplate(AmqpTemplate template) {
this.template = template;
}
public void write(List<? extends T> items) throws Exception {
for (T item : items) {
Object reply = template.convertSendAndReceive(item.toString());
System.out.println("producer output: " + reply);
}
}
}
春のバッチ ジョブを実行すると、各メッセージが 1 つずつ送信されて処理され、応答を下回ります。
consumer output: 1
producer output: 1
consumer output: 2
producer output: 2
consumer output: 3
producer output: 3
consumer output: 4
producer output: 4
consumer output: 5
producer output: 5
5 つのメッセージを送信してキューに入れ、5 つのコンシューマー スレッド (同時実行数 = 5) がそれらを同時に処理し、完了するとすぐに応答する必要があります。
So below should be the outout
consumer output: 1
consumer output: 2
consumer output: 3
consumer output: 4
consumer output: 5
producer output: 1
producer output: 2
producer output: 3
producer output: 4
producer output: 5
プロデューサーが最初のメッセージの返信を待って2番目のメッセージをキューに入れたくありません。
非同期にする convertAndSend を使用してみましたが (返信を待機しません)、 convertSendAndReceive で取得できるように、itemWriter で返信メッセージを取得するにはどうすればよいですか?
テンプレート構成を次のように変更すると、
<rabbit:template id="importAmqpTemplate"
connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}"
routing-key="${rabbitmq.import.queue}" reply-queue="${rabbitmq.import.reply.queue}">
<rabbit:reply-listener/>
</rabbit:template>
そして、 template.convertAndSend(item.toString()); を使用する場合 では、どうすれば返信メッセージを取得できますか?
このリスナーに独自のメッセージ ハンドラーをアタッチして、コンシューマー側でアタッチできる方法で応答メッセージを取得することはできません。返信には、デフォルトの RabbitmqTemplate ハンドラが必要です。