3

私は自分のアプリケーションで次のフローを実行しています:

  1. ブローカーから 1 件のメッセージを取得 (手動承認)

  2. いくつかの処理を行います

  3. データベースとブローカーでトランザクションを開始する

  4. データベースにいくつかのレコードを挿入し、ブローカーにいくつかのメッセージを公開します (別のキュー)

  5. commit データベースとブローカー

  6. ステップ 1 でブローカーから受け取った ack メッセージ。

ブローカーでのすべての操作は、単一のチャネルを介して行われます。準備コードは次のとおりです。

Connection brokerConnection = factory.newConnection();              
Channel channel = brokerConnection.createChannel();
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("receive-queue", false, consumer);

以下は私のコードです。分かりやすくするために、、tryの部分を削除しました。catchすべての例外をファイルに記録します。ステップ1:

QueueingConsumer.Delivery delivery = consumer.nextDelivery();
Request request = (Request) SerializationUtils.deserialize(delivery.getBody());

ステップ 2、3、4、5:

dbConnection.setAutoCommit(false);
channel.txSelect();

stmt = dbConnection.prepareStatement(query);
/* set paramteres */
stmt.executeUpdate();
channel.basicPublish(/* exchange name */, "KEY", MessageProperties.PERSISTENT_BASIC, /* result */ result);

dbConnection.commit();
channel.txCommit();
dbConnection.setAutoCommit(true);

ステップ 6:

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

1 回の反復の後、データベースとブローカーにレコードが表示されます (ステップ 5 まで正常に動作していることを意味します)。問題は、手順 6 の後に受信キューのメッセージが削除されず、管理プラグインが未確認のメッセージを 1 つ表示することです。また、ログ ファイルに例外はありません。誰でも助けることができますか?

[UPDATE1]

ここで、発行用に 1 つのチャネルを作成し、受信用に別のチャネルを作成します。これは現在機能しています。では、(トランザクションを使用して) 受信と公開に単一のチャネルを使用するにはどうすればよいでしょうか? 以前は受信と発行に単一のチャネルを使用していましたが、トランザクションはありませんでした。

[更新2]

ステップ6をトランザクション内に移動しましたが、現在は機能しています。

dbConnection.setAutoCommit(false);
channel.txSelect();

stmt = dbConnection.prepareStatement(query);
/* set paramteres */
stmt.executeUpdate();
channel.basicPublish(/* exchange name */, "KEY", MessageProperties.PERSISTENT_BASIC, /* result */ result);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 

dbConnection.commit();
channel.txCommit();
dbConnection.setAutoCommit(true);

私は少し混乱しています。パブリッシュセクションをトランザクション内に配置したいだけです。

4

1 に答える 1