10

RabbitMQ の初心者であり、Java の初心者です。

手動 ack を使用し、Java Spring AMQP 抽象化を使用して消費者キャンセル通知を処理するリスナーを作成しようとしています。Spring 抽象化を使用して両方のタスクを達成できますか?

キューからメッセージを取得してそのメッセージを処理するリスナーを作成したいと考えています (おそらくデータベースなどに書き込みます)。メッセージの処理が失敗したり、何らかの理由で完了できない場合に、拒否して再度キューに入れることができるように、手動の確認応答を使用することを計画しました。これまでのところ、Spring AMQP を使用して手動で ack/nack/reject するには、ChannelAwareMessageListener.

私は、RabbitMQ からのコンシューマー キャンセル通知を処理する必要があることを認識してChannelAwareMessageListenerいますが、これをコーディングする方法が実際にはわかりません。CCN を処理する唯一の方法は、メッセージ配信とキャンセルを処理できるchannel.basicConsume()新しいインスタンスを呼び出して渡すことにより、下位レベルの Java クライアント API を使用してコードを記述することです。DefaultConsumer

clientPropertiesまた、構成内の Bean からファクトリを取得しているため、ConnectionFactory(ブローカーに CCN を処理できることを伝えるために)を設定する方法もわかりません。

リスナーとコンテナの作成の私の擬似コードは以下のとおりです。

public class MyChannelAwareListener implements ChannelAwareMessageListener
{
    @Override
    public void onMessage(Message message, Channel channel) throws Exception
    {
        msgProcessed = processMessage(message);

        if(msgProcessed)    
           channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        else
           channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);  
    }
}

public static void main(String[] args) throws Exception
{
    ConnectionFactory rabbitConnectionFactory;
    ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext   (MY_CONTEXT_PATH);
    rabbitConnectionFactory = (ConnectionFactory)ctx.getBean("rabbitConnectionFactory");

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

    MyChannelAwareListener listener = new MyChannelAwareListener();
    container.setMessageListener(listener);
    container.setQueueNames("myQueue");
    container.setConnectionFactory(rabbitConnectionFactory);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    container.start();
}
4

1 に答える 1

2

setClientPropertiesクライアント プロパティを設定するには、 のメソッドを使用する必要がありますConnectionFactory(この ConnectionFactory が RabbitMQ Java ライブラリのオブジェクトであると仮定します)。このメソッドはMap<String, Object>、クライアントのプロパティと機能を含む を想定しています。次の行は、RabbitMQ Java ライブラリ内のデフォルト値です。

Map<String,Object> props = new HashMap<String, Object>();
props.put("product", LongStringHelper.asLongString("RabbitMQ"));
props.put("version", LongStringHelper.asLongString(ClientVersion.VERSION));
props.put("platform", LongStringHelper.asLongString("Java"));
props.put("copyright", LongStringHelper.asLongString(Copyright.COPYRIGHT));
props.put("information", LongStringHelper.asLongString(Copyright.LICENSE));

Map<String, Object> capabilities = new HashMap<String, Object>();
capabilities.put("publisher_confirms", true);
capabilities.put("exchange_exchange_bindings", true);
capabilities.put("basic.nack", true);
capabilities.put("consumer_cancel_notify", true);

props.put("capabilities", capabilities);

channel.basicConsumeACK の管理とコンシューマのキャンセルについては、Spring AMQP 抽象化でそれを行う方法がわかりませんが、すべてのコールバック メソッドを介してすべてのシナリオを処理する可能性を与える完全に実行可能です。

http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.1.5/rabbitmq-java-client-javadoc-3.1.5/

お役に立てれば!

于 2013-10-17T01:01:39.303 に答える