1

spring amqp を使用して rabbitmq を使用しようとしています。以下は私の構成です。

<rabbit:connection-factory id="rabbitConnectionFactory"
    port="${rabbitmq.port}" host="${rabbitmq.host}" />

<rabbit:admin connection-factory="rabbitConnectionFactory" />

<rabbit:queue name="${rabbitmq.import.queue}" />

<rabbit:template id="importAmqpTemplate"
    connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}" />

<beans:bean id="importExchangeMessageListener"
    class="com.stockopedia.batch.foundation.ImportMessageListener" />

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="5">
    <rabbit:listener queues="${rabbitmq.import.queue}" ref="importMessageListener" />
</rabbit:listener-container>

これは単純な Message Listener クラスです。

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class ImportMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        System.out.println("consumer output: " + message);
    }

}

こちらはプロデューサー(春バッチのitemWriter)、

public class ImportItemWriter<T> implements ItemWriter<T> {

    private AmqpTemplate template;

    public AmqpTemplate getTemplate() {
        return template;
    }

    public void setTemplate(AmqpTemplate template) {
        this.template = template;
    }

    public void write(List<? extends T> items) throws Exception {
        for (T item : items) {
            Object reply = template.convertSendAndReceive(item.toString());
            System.out.println("producer output: " + reply);
        }
    }

}

春のバッチ ジョブを実行すると、ImportItemWriter.write が呼び出されます。ただし、 ImportMessageListener.onMessage は機能しません。メッセージを出力しません。コンソールのすべての項目について以下の出力が得られます

producer output: null
producer output: null
producer output: null
producer output: null
producer output: null
producer output: null
producer output: null
4

1 に答える 1

5

あなたの消費者は結果を送信していません...

@Override
public void onMessage(Message message) {
    System.out.println("consumer output: " + message);
}

シンプルな POJO に変更します。コンテナMessageListenerAdapterが変換を処理し、結果を送信します。

@Override
public String handleMessage(String message) {
    System.out.println("consumer output: " + message);
    return "result";
}

編集:

また、キューへの交換やルーティングも設定していません。デフォルトの交換/ルーティングを使用する場合は、...

convertSendAndReceive("", queueName, item.toString());

EDIT2:

または、routingKeyテンプレートの をキュー名に設定すると、より簡単な方法を使用できます。

...sendAndReceive()メソッドはリクエスト/リプライのシナリオ向けであるため、ブロッキングが必要です。非同期で行うには、メソッドの 1 つを使用...send()し、独自のメソッドを接続してSimpleListenerContainer応答を受信する必要があります。独自の関連付けを行う必要があります。使用する

public void convertAndSend(Object message, MessagePostProcessor postProcessor)

メッセージ ポスト プロセッサで、ヘッダーreplyTocorrelationIdヘッダーを設定します...

message.getMessageProperties().setReplyTo("foo");
message.getMessageProperties().setCorrelationId("bar");

または、Message自分でオブジェクトを作成し (たとえば、 を使用してMessageBuilder)、sendメソッドを使用します...

template.send(MessageBuilder.withBody("foo".getBytes())
            .setReplyTo("bar")
            .setCorrelationId("baz".getBytes())
            .build());

correlationId応答を相互に関連付けるために、各要求には固有のものが必要です。

于 2014-04-20T23:25:12.980 に答える