0

すべてのメッセージを rabbitmq キューに入れ、リモート サーバーで処理しています。以下は、同じクラスのプロデューサーと応答ハンドラーです。

public class AmqpAsynchRpcItemWriter<T> implements ItemWriter<T>,
        MessageListener {

    protected String exchange;
    protected String routingKey;
    protected String queue;
    protected String replyQueue;
    protected RabbitTemplate template;

    // Reply handler 
    @Override
    public void onMessage(Message message) {

        try {
            String corrId = new String(message.getMessageProperties()
                    .getCorrelationId(), "UTF-8");
            System.out.println("received " + corrId + " from " + this.replyQueue);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    //Producer
    @Override
    public void write(List<? extends T> items) throws Exception {

        for (T item : items) {
            System.out.println(item);

            System.out.println("Queing " + item + " to " + this.queue);

            Message message = MessageBuilder
                    .withBody(item.toString().getBytes())
                    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                    .setReplyTo(this.replyQueue)
                    .setCorrelationId(item.toString().getBytes()).build();

            template.send(this.exchange, this.routingKey, message);

            System.out.println("Queued " + item + " to " + this.queue);

        }

        // It should wait here untill we get all replies in onMessage, How can we do this ?

    }

すべてのメッセージを write メソッドで送信し、onMessage で返信を取得しています。これは正常に機能していますが、書き込みは応答を待たず、呼び出し元に戻り、スプリング バッチ ステップが完了としてマークされます。

しかし、onMessage ですべての返信が得られるまで、すべてのメッセージを送信した後、プロセスが返信を待つようにしたいと考えています。どうすればこれを行うことができますか?

4

1 に答える 1

0

同期手法はいくつでも使用できます。たとえば、すべての返信が受信されるまで、リスナーputに返信をLinkedBlockingQueue送信し、キューから送信者take(またはタイムアウト付き) を送信します。poll

または、リスナーをまったく使用せずに、すべての応答が受信されるまで応答キューから同じRabbitTemplateものを使用します。receive()

ただし、キューが空の場合はreceive()返さnullれるため、CPU のスピンを避けるために受信間でスリープする必要があります。

于 2014-04-25T14:06:13.760 に答える