2

春のバッチ アイテム ライターで春の amqp テンプレートを使用して、rabbitmq キューにメッセージを追加しています。

public class AmqpAsynchRpcItemWriter<T> implements ItemWriter<T> {

    protected String exchange;
    protected String routingKey;
    protected String queue;
    protected String replyQueue;
    protected RabbitTemplate template;
    BlockingQueue<Object> blockingQueue;


    public void onMessage(Object msgContent) {

        try {
            blockingQueue.put(msgContent);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }

    @Override
    public void write(List<? extends T> items) throws Exception {

        for (T item : items) {

            Message message = MessageBuilder
                    .withBody(item.toString().getBytes())
                    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                    .setReplyTo(this.replyQueue)
                    .setCorrelationId(item.toString().getBytes()).build();

            template.send(this.exchange, this.routingKey, message);

        }

        for (T item : items) {

            Object msg = blockingQueue.poll(60, TimeUnit.SECONDS);

            if (msg instanceof Exception) {
                throw (Exception) msg;
            } else if (msg == null) {
                System.out.println("reply timeout...");
                break;
            } 

        }
    }

}

メッセージは別のリモート サーバーで処理されます。メッセージ処理が失敗した場合 (何らかの例外が原因で)、ステップの実行が停止されるユースケースを処理しようとしています。

キュー内の残りのメッセージも失敗するため、キュー内の残りのメッセージを消費して処理しないように、そのキュー内の残りのメッセージをすべてパージしたいと考えています。

ステップが失敗した場合、アイテム ライターは再びすべてのメッセージをキューに入れるため、例外が発生した場合は残りのメッセージをすべて消去する必要があります。

spring amqp を使用してキューをパージするにはどうすればよいですか?

4

3 に答える 3

10

私はそれを使用してそれを行うことができました

admin.purgeQueue(this.queue, true);

于 2014-04-27T16:39:07.487 に答える
3

代わりにRabbitAdminを使用します

http://docs.spring.io/autorepo/docs/spring-amqp-dist/1.3.4.RELEASE/api/org/springframework/amqp/rabbit/core/RabbitAdmin.html#purgeQueue%28java.lang.String, %20ブール値%29

@Autowired プライベート RabbitAdmin 管理者;

...

admin.purgeQueue("queueName", false);

于 2015-03-17T21:39:39.100 に答える
1

あなたが使用することができます

AMQP.Queue.PurgeOk queuePurge(java.lang.String queue)

"queuePurge を参照してください:

http://www.rabbitmq.com/amqp-0-9-1-quickref.html#queue.purge "

于 2014-04-27T17:31:22.940 に答える