xml ファイルがさまざまなオブジェクトに解析される構成済みのスプリング統合パイプラインがあります。オブジェクトはいくつかのチャネル エンドポイントを通過し、そこでわずかに変更されます。特別なことはなく、いくつかのプロパティが追加されただけです。
パイプラインの最後のエンドポイントは、オブジェクトが DB に永続化される永続化機能です。重複がある可能性があるため、このエンドポイントでは、オブジェクトが既に永続化されているか、新しいオブジェクトであるかもチェックされます。シンプルなダイレクト チャネルを備えたメッセージ ドリブン アーキテクチャを使用しています。
<int:channel id="parsedObjects1" />
<int:channel id="parsedObjects2" />
<int:channel id="processedObjects" />
<int:service-activator input-channel="parsedObjects1" ref="processor1" method="process" />
<int:service-activator input-channel="parsedObjects2" ref="processor2" method="process" />
<int:service-activator input-channel="processedObjects" ref="persister" method="persist" />
現時点では、xml ファイルを取得するデータ ソースは 1 つだけで、すべてが順調に進んでいます。問題は、2 番目のデータ ソースを接続する必要があるときに始まります。ファイルは同時に来るので、並行して処理したいです。そのため、2 つのパーサー インスタンスを配置し、すべてのパーサーがパイプラインを介してメッセージを送信しています。私が持っている直接チャネルを使用した構成では、同時実行の問題が発生するため、変更を試みました。春の統合ドキュメントからいくつかの構成を試しましたが、これまでのところ成功していません。
すべてのチャネル エンドポイントでメッセージごとに 1 つのスレッド - 最大プール サイズ 1 で構成されたディスパッチャで試しました。
<task:executor id="channelTaskExecutor" pool-size="1-1" keep-alive="10" rejection-policy="CALLER_RUNS" queue-capacity="1" />
<int:channel id="parsedObjects1" >
<int:dispatcher task-executor="channelTaskExecutor" />
</int:channel>
<int:channel id="parsedObjects2" >
<int:dispatcher task-executor="channelTaskExecutor" />
</int:channel>
<int:channel id="processedObjects" >
<int:dispatcher task-executor="channelTaskExecutor" />
</int:channel>
queue-poller 設定も試しました:
<task:executor id="channelTaskExecutor" pool-size="1-1" keep-alive="10" rejection-policy="CALLER_RUNS" queue-capacity="1" />
<int:channel id="parsedObjects1" >
<int:rendezvous-queue/>
</int:channel>
<int:channel id="parsedObjects2" >
<int:rendezvous-queue/>
</int:channel>
<int:channel id="processedObjects" >
<int:rendezvous-queue/>
</int:channel>
<int:service-activator input-channel="parsedObjects1" ref="processor1" method="process" >
<int:poller task-executor="channelTaskExecutor" max-messages-per-poll="1" fixed-rate="2" />
</int:service-activator>
<int:service-activator input-channel="parsedObjects2" ref="processor2" method="process" >
<int:poller task-executor="channelTaskExecutor" max-messages-per-poll="1" fixed-rate="2" />
</int:service-activator>
<int:service-activator input-channel="processedObjects" ref="persister" method="persist" >
<int:poller task-executor="channelTaskExecutor" max-messages-per-poll="1" fixed-rate="2" />
</int:service-activator>
基本的に、チャネル エンドポイント (私の場合は永続化) で競合状態を取り除きたいと考えています。永続化チャネル エンドポイントは、メッセージごとにブロックする必要があります。これを並行して実行すると、DB に永続化された多くの重複が発生するためです。
編集:
私が行ったいくつかのデバッグの後、問題は構成ではなくエンドポイントのロジックにあるようです。パイプラインを介してパーシスタに送信されるオブジェクトの一部は、ファイルの解析が完了するまでローカル キャッシュにも保存されます。後でパイプラインを介して送信され、他のドメインの一部として結合テーブルを永続化します。エンティティ。上記の構成では、オブジェクトの一部がパイプラインで 2 回目に送信されたときにまだ永続化されていなかったため、最後に DB で重複が発生しました。私は春の統合にかなり慣れていないので、おそらくこの時点でより一般的な質問をします。複数のデータ ソースを使用するセットアップでは、パーサーなどの複数のインスタンスを意味します。
- パイプラインを構成して並列化を有効にする一般的な方法 (最善の方法) はありますか?
- 必要がある場合、メッセージ処理をシリアル化する方法はありますか?
どんな提案も歓迎します。前もって感謝します。