RabbitMQ の初心者であり、Java の初心者です。
手動 ack を使用し、Java Spring AMQP 抽象化を使用して消費者キャンセル通知を処理するリスナーを作成しようとしています。Spring 抽象化を使用して両方のタスクを達成できますか?
キューからメッセージを取得してそのメッセージを処理するリスナーを作成したいと考えています (おそらくデータベースなどに書き込みます)。メッセージの処理が失敗したり、何らかの理由で完了できない場合に、拒否して再度キューに入れることができるように、手動の確認応答を使用することを計画しました。これまでのところ、Spring AMQP を使用して手動で ack/nack/reject するには、ChannelAwareMessageListener
.
私は、RabbitMQ からのコンシューマー キャンセル通知を処理する必要があることを認識してChannelAwareMessageListener
いますが、これをコーディングする方法が実際にはわかりません。CCN を処理する唯一の方法は、メッセージ配信とキャンセルを処理できるchannel.basicConsume()
新しいインスタンスを呼び出して渡すことにより、下位レベルの Java クライアント API を使用してコードを記述することです。DefaultConsumer
clientProperties
また、構成内の Bean からファクトリを取得しているため、ConnectionFactory
(ブローカーに CCN を処理できることを伝えるために)を設定する方法もわかりません。
リスナーとコンテナの作成の私の擬似コードは以下のとおりです。
public class MyChannelAwareListener implements ChannelAwareMessageListener
{
@Override
public void onMessage(Message message, Channel channel) throws Exception
{
msgProcessed = processMessage(message);
if(msgProcessed)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
else
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
public static void main(String[] args) throws Exception
{
ConnectionFactory rabbitConnectionFactory;
ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext (MY_CONTEXT_PATH);
rabbitConnectionFactory = (ConnectionFactory)ctx.getBean("rabbitConnectionFactory");
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
MyChannelAwareListener listener = new MyChannelAwareListener();
container.setMessageListener(listener);
container.setQueueNames("myQueue");
container.setConnectionFactory(rabbitConnectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.start();
}