すべてのメッセージを rabbitmq キューに入れ、リモート サーバーで処理しています。以下は、同じクラスのプロデューサーと応答ハンドラーです。
public class AmqpAsynchRpcItemWriter<T> implements ItemWriter<T>,
MessageListener {
protected String exchange;
protected String routingKey;
protected String queue;
protected String replyQueue;
protected RabbitTemplate template;
// Reply handler
@Override
public void onMessage(Message message) {
try {
String corrId = new String(message.getMessageProperties()
.getCorrelationId(), "UTF-8");
System.out.println("received " + corrId + " from " + this.replyQueue);
} catch (IOException e) {
e.printStackTrace();
}
}
//Producer
@Override
public void write(List<? extends T> items) throws Exception {
for (T item : items) {
System.out.println(item);
System.out.println("Queing " + item + " to " + this.queue);
Message message = MessageBuilder
.withBody(item.toString().getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setReplyTo(this.replyQueue)
.setCorrelationId(item.toString().getBytes()).build();
template.send(this.exchange, this.routingKey, message);
System.out.println("Queued " + item + " to " + this.queue);
}
// It should wait here untill we get all replies in onMessage, How can we do this ?
}
すべてのメッセージを write メソッドで送信し、onMessage で返信を取得しています。これは正常に機能していますが、書き込みは応答を待たず、呼び出し元に戻り、スプリング バッチ ステップが完了としてマークされます。
しかし、onMessage ですべての返信が得られるまで、すべてのメッセージを送信した後、プロセスが返信を待つようにしたいと考えています。どうすればこれを行うことができますか?