15

Spring Integration に大きく依存する自己実行可能な jar プログラムがあります。私が抱えている問題は、他の Spring Bean が完全に終了する前にプログラムが終了することです。

以下は、私が使用しているコードの縮小版です。必要に応じて、より多くのコード/構成を提供できます。エントリ ポイントは、Spring をブートストラップし、インポート プロセスを開始する main() メソッドです。

public static void main(String[] args) {
    ctx = new ClassPathXmlApplicationContext("flow.xml");
    DataImporter importer = (DataImporter)ctx.getBean("MyImporterBean");
    try {
        importer.startImport();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        ctx.close();
    }
}

DataImporter には、Spring Integration ゲートウェイにメッセージを送信する単純なループが含まれています。これにより、データのポーリングという一般的なアプローチではなく、フローへのアクティブな「プッシュ」アプローチが提供されます。これが私の問題の出番です:

public void startImport() throws Exception {
    for (Item item : items) {
        gatewayBean.publish(item);
        Thread.sleep(200); // Yield period
    }
}

完全を期すために、フロー XML は次のようになります。

<gateway default-request-channel="inChannel" service-interface="GatewayBean" />

<splitter input-channel="inChannel" output-channel="splitChannel" />

<payload-type-router input-channel="splitChannel">
    <mapping type="Item" channel="itemChannel" />
    <mapping type="SomeOtherItem" channel="anotherChannel" />
</payload-type-router>

<outbound-channel-adapter channel="itemChannel" ref="DAOBean" method="persist" />

フローはアイテムを効果的に開始して処理しますが、startImport() ループが終了すると、メイン スレッドが終了し、すべての Spring Integration スレッドがすぐに破棄されます。これにより競合状態が発生し、プログラムの終了時に最後の (n) 項目が完全に処理されません。

処理しているアイテムの参照カウントを維持するという考えはありますが、フローがメッセージを複数のサービス アクティベーターに分割/ルーティングすることが多いため、これは非常に複雑であることがわかっています。つまり、各アイテムに「終了した"。

私が必要だと思うのは、Spring Bean がまだ実行されていないことを確認するか、ゲートウェイに送信されたすべてのアイテムが終了する前に完全に処理されたことを示すフラグを立てる方法です。

私の質問は、これらのいずれかを行うにはどうすればよいですか、または私が考えていなかった問題に対するより良いアプローチがありますか?

4

2 に答える 2

10

ここではリクエスト/レスポンス パターンを使用していません。

outbound-channel-adapter は起動して忘れるアクションです。応答を待ちたい場合は、応答を待つ outbound-gateway を使用し、応答を元のゲートウェイに接続してから、Java sendAndReceive で公開するだけではありません。

于 2012-01-19T15:19:23.833 に答える
3

Itemまだ必要かどうかを判断するを取得できる場合(processingFinished() またはバックエンド ステージで実行される同様のもの)、すべてItemの を中央機関に登録できます。 -finishedItemであり、効果的に終了条件を決定します。

このアプローチが実現可能な場合は、アイテムをFutureTaskオブジェクトにパッケージ化することや、 の同様の概念を利用することも考えられjava.util.concurrentます。

編集:2番目のアイデア:

チャネルをよりインテリジェントにすることを考えたことはありますか? 送信者は、それ以上データを送信しなくなったらチャネルを閉じます。このシナリオでは、ワーカー Bean はデーモン スレッドである必要はありませんが、閉じられた空の入力チャネルに基づいて終了基準を決定できます。

于 2012-01-19T12:04:33.963 に答える