2

xml ファイルがさまざまなオブジェクトに解析される構成済みのスプリング統合パイプラインがあります。オブジェクトはいくつかのチャネル エンドポイントを通過し、そこでわずかに変更されます。特別なことはなく、いくつかのプロパティが追加されただけです。

パイプラインの最後のエンドポイントは、オブジェクトが DB に永続化される永続化機能です。重複がある可能性があるため、このエンドポイントでは、オブジェクトが既に永続化されているか、新しいオブジェクトであるかもチェックされます。シンプルなダイレクト チャネルを備えたメッセージ ドリブン アーキテクチャを使用しています。

<int:channel id="parsedObjects1" />
<int:channel id="parsedObjects2" />
<int:channel id="processedObjects" />
<int:service-activator input-channel="parsedObjects1" ref="processor1" method="process" />
<int:service-activator input-channel="parsedObjects2" ref="processor2" method="process" />
<int:service-activator input-channel="processedObjects" ref="persister" method="persist" />

現時点では、xml ファイルを取得するデータ ソースは 1 つだけで、すべてが順調に進んでいます。問題は、2 番目のデータ ソースを接続する必要があるときに始まります。ファイルは同時に来るので、並行して処理したいです。そのため、2 つのパーサー インスタンスを配置し、すべてのパーサーがパイプラインを介してメッセージを送信しています。私が持っている直接チャネルを使用した構成では、同時実行の問題が発生するため、変更を試みました。春の統合ドキュメントからいくつかの構成を試しましたが、これまでのところ成功していません。

すべてのチャネル エンドポイントでメッセージごとに 1 つのスレッド - 最大プール サイズ 1 で構成されたディスパッチャで試しました。

<task:executor id="channelTaskExecutor" pool-size="1-1" keep-alive="10" rejection-policy="CALLER_RUNS" queue-capacity="1" />
<int:channel id="parsedObjects1" >
    <int:dispatcher task-executor="channelTaskExecutor" />
</int:channel>
<int:channel id="parsedObjects2" >
    <int:dispatcher task-executor="channelTaskExecutor" />
</int:channel>
<int:channel id="processedObjects" >
    <int:dispatcher task-executor="channelTaskExecutor" />
</int:channel>

queue-poller 設定も試しました:

<task:executor id="channelTaskExecutor" pool-size="1-1" keep-alive="10" rejection-policy="CALLER_RUNS" queue-capacity="1" />
<int:channel id="parsedObjects1" >
    <int:rendezvous-queue/>
</int:channel>
<int:channel id="parsedObjects2" >
    <int:rendezvous-queue/>
</int:channel>
<int:channel id="processedObjects" >
    <int:rendezvous-queue/>
</int:channel>

<int:service-activator input-channel="parsedObjects1" ref="processor1" method="process" >
    <int:poller task-executor="channelTaskExecutor" max-messages-per-poll="1"  fixed-rate="2" />
</int:service-activator>
<int:service-activator input-channel="parsedObjects2" ref="processor2" method="process" >
    <int:poller task-executor="channelTaskExecutor" max-messages-per-poll="1"  fixed-rate="2" />
</int:service-activator>
<int:service-activator input-channel="processedObjects" ref="persister" method="persist" >
    <int:poller task-executor="channelTaskExecutor" max-messages-per-poll="1"  fixed-rate="2" />
</int:service-activator>

基本的に、チャネル エンドポイント (私の場合は永続化) で競合状態を取り除きたいと考えています。永続化チャネル エンドポイントは、メッセージごとにブロックする必要があります。これを並行して実行すると、DB に永続化された多くの重複が発生するためです。

編集:

私が行ったいくつかのデバッグの後、問題は構成ではなくエンドポイントのロジックにあるようです。パイプラインを介してパーシスタに送信されるオブジェクトの一部は、ファイルの解析が完了するまでローカル キャッシュにも保存されます。後でパイプラインを介して送信され、他のドメインの一部として結合テーブルを永続化します。エンティティ。上記の構成では、オブジェクトの一部がパイプラインで 2 回目に送信されたときにまだ永続化されていなかったため、最後に DB で重複が発生しました。私は春の統合にかなり慣れていないので、おそらくこの時点でより一般的な質問をします。複数のデータ ソースを使用するセットアップでは、パーサーなどの複数のインスタンスを意味します。

  1. パイプラインを構成して並列化を有効にする一般的な方法 (最善の方法) はありますか?
  2. 必要がある場合、メッセージ処理をシリアル化する方法はありますか?

どんな提案も歓迎します。前もって感謝します。

4

3 に答える 3

1

まず、「並行性の問題」とは何か説明していただけますか? 理想的には、メッセージ処理をシリアル化する必要がないため、それから始めるのがよいでしょう。

次に、設定したスレッド プールは完全にはシリアル化されません。プールで使用可能なスレッドは 1 つありますが、選択した拒否ポリシーにより、キューが容量に達した場合に、呼び出し元のスレッドがタスク自体を実行します (基本的にスロットリング)。つまり、プールからのスレッドと同時に、呼び出し元が実行するスレッドを取得します。

于 2012-05-22T13:09:19.597 に答える
0

パイプラインを機能させることができました。現在の構成を維持するのか、それとももう少し実験するのかはわかりませんが、今のところ、これが最終的な構成です。

<task:executor id="channelTaskExecutor" pool-size="1-1" keep-alive="10" rejection-policy="CALLER_RUNS" queue-capacity="1" />
<int:channel id="parsedObjects1" >
<int:queue capacity="1000" />
</int:channel>
<int:channel id="parsedObjects2" >
<int:queue capacity="1000" />
</int:channel>
<int:channel id="processedObjects" >
<int:queue capacity="1000" />
</int:channel>

<int:service-activator input-channel="parsedObjects1" ref="processor1" method="process" >
<int:poller task-executor="channelTaskExecutor" max-messages-per-poll="100"  fixed-rate="2" />
</int:service-activator>
<int:service-activator input-channel="parsedObjects2" ref="processor2" method="process" >
<int:poller task-executor="channelTaskExecutor" max-messages-per-poll="100"  fixed-rate="2" />
</int:service-activator>
<int:service-activator input-channel="processedObjects" ref="persister" method="persist" >
<int:poller task-executor="channelTaskExecutor" max-messages-per-poll="1"  fixed-rate="2" />
</int:service-activator>
于 2012-05-24T10:53:26.533 に答える
0

あなたのシナリオで私が考えることができる最良の方法は、次のようなものです。

parsedObject1 と parsedObject2 を通常のキュー チャネルにします。キューの容量は適切に設定できます (いつでも 25 など)。

<int:channel id="parsedObjects1" >
    <int:queue />
</int:channel>

この時点で、2 つのチャネル (parsedObjects1 と parsedObjects2) に対する xml プロセッサが xml を処理し、processedObjectsチャネルに出力する必要があります。ここでは、processedObjects チャネルを明示的に指定したことを除いて、これと同様の構成を使用できます。

<int:service-activator input-channel="parsedObjects1" ref="processor1" method="process" output-channel="processedObjects">
    <int:poller task-executor="channelTaskExecutor"/>
</int:service-activator>

3番目のステップは、構成から逸脱する場所です。この時点で、永続性をシリアル化したいと言われました。最善の方法は、プールサイズが1の異なるタスクエグゼキューターを介して実行することです。この方法では、1つのインスタンスのみ永続化機能はいつでも実行されています:

<task:executor id="persisterpool" pool-size="1"/>
<int:service-activator input-channel="processedObjects" ref="persister" method="persist" >
    <int:poller task-executor="persisterpool" fixed-delay="2"/>
</int:service-activator>
于 2012-05-23T08:10:35.290 に答える