春のバッチ アイテム ライターで春の amqp テンプレートを使用して、rabbitmq キューにメッセージを追加しています。
public class AmqpAsynchRpcItemWriter<T> implements ItemWriter<T> {
protected String exchange;
protected String routingKey;
protected String queue;
protected String replyQueue;
protected RabbitTemplate template;
BlockingQueue<Object> blockingQueue;
public void onMessage(Object msgContent) {
try {
blockingQueue.put(msgContent);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void write(List<? extends T> items) throws Exception {
for (T item : items) {
Message message = MessageBuilder
.withBody(item.toString().getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setReplyTo(this.replyQueue)
.setCorrelationId(item.toString().getBytes()).build();
template.send(this.exchange, this.routingKey, message);
}
for (T item : items) {
Object msg = blockingQueue.poll(60, TimeUnit.SECONDS);
if (msg instanceof Exception) {
throw (Exception) msg;
} else if (msg == null) {
System.out.println("reply timeout...");
break;
}
}
}
}
メッセージは別のリモート サーバーで処理されます。メッセージ処理が失敗した場合 (何らかの例外が原因で)、ステップの実行が停止されるユースケースを処理しようとしています。
キュー内の残りのメッセージも失敗するため、キュー内の残りのメッセージを消費して処理しないように、そのキュー内の残りのメッセージをすべてパージしたいと考えています。
ステップが失敗した場合、アイテム ライターは再びすべてのメッセージをキューに入れるため、例外が発生した場合は残りのメッセージをすべて消去する必要があります。
spring amqp を使用してキューをパージするにはどうすればよいですか?