1

JDBCMessageStore を使用するときの Aggregator の動作を理解しようとしています。以下は私のユースケースです:

1) キューからメッセージ (注文の詳細) を読み取ります。

<?xml version="1.0" encoding="UTF-8"?>
<order xmlns="http://www.example.org/orders">
  <orderItem>
    <isbn>12333454443</isbn>
    <quantity>4</quantity>
  </orderItem>
  <orderItem>
    <isbn>545656777</isbn>
    <quantity>50</quantity>
  </orderItem>
  ..
  ..
</order>

1 つの注文メッセージには複数のorderItemが含まれます。

2) orderItemの数に基づいて注文メッセージを分割する

3) 4 つのorderItemメッセージに集約します。messageCountReleaseStrategy は、しきい値 = 4 で使用されます

4) 集約されたメッセージをサービス アクティベーターに転送します。

INT_MESSAGE_GROUP、INT_MESSAGE、および INT_GROUP_TO_MESSAGE の 3 つのテーブルのデータを観察していましたが、これらの 3 つのテーブルに最初にデータが挿入され、グループが解放されると、これらのテーブルから日付が削除/削除されるようです。

テーブルの 1 つ (INT_GROUP_TO_MESSAGE) で DB 挿入の失敗を確認したかったのです。エラーをシミュレートするために、テーブル INT_GROUP_TO_MESSAGE を削除しました。

以下のテストが実行されました。

1) 3 つのorderItemで 1 つの注文メッセージを投稿

2) メッセージでエラーが発生しました - テーブルが存在しません - (INT_GROUP_TO_MESSAGE が存在しないと予想されます)

3) 2 つのテーブルでレコードを見つけることができました

a) INT_MESSAGE_GROUP には 1 つのレコードがありました。

b) INT_MESSAGE には 23 のレコードがありました。メッセージが複数回再試行されたため、複数のエントリが行われたようです。

4) テーブル INT_GROUP_TO_MESSAGE を作成しました

5) 1 件のorderItemで 1 件の注文メッセージを投稿

6) 元のメッセージ (ステップ 1) と新しいメッセージ (ステップ 5) がキューから取得され、処理されました。

7) アグリゲーターは、グループに 4 つのメッセージをリリースしました。

8) ただし、データベース テーブル - INT_MESSAGEにはまだ 23 のレコードが存在しますが、他のテーブルは空です。

テーブルを削除するときに Db エラーをシミュレートしましたが、本番環境では、これらの挿入が失敗する原因となるメモリ、テーブルスペースの問題が発生する可能性があります。

私の質問は、INT_MESSAGE_GROUP、INT_MESSAGE、および INT_GROUP_TO_MESSAGE の 3 つのテーブルにトランザクションがあるように、任意のパラメーターを設定できますか?

以下は私の構成です

<int-jms:message-driven-channel-adapter id="jmsIn"
                                      channel="mqInbound"
                                      destination="requestQueue" 
                                      message-converter="orderMessageConverter"/>


<int:splitter input-channel="mqInbound"  output-channel="item" expression="payload.orderItem"/>


<int:aggregator input-channel="item"                 output-channel="itemList"
              ref="orderAggregator"                  method="sendList"
              correlation-strategy="orderAggregator" correlation-strategy-method="groupOrders"
              expire-groups-upon-completion="true"   release-strategy="messageCountReleaseStrategy" 
              message-store="messageStore"           discard-channel="aggregatorDiscardChannel" />


<int-jdbc:message-store id="messageStore" data-source="jdbcDatasource"  table-prefix="MY_INT_"/>

  <bean id="messageCountReleaseStrategy"     class="org.springframework.integration.aggregator.MessageCountReleaseStrategy">
    <constructor-arg index="0" value="4"/>
  </bean>
4

0 に答える 0