6

私たちのアプリケーションは、Spring Integration Framework を使用して設計されています。完全なメッセージ アクション フローは、JMS メッセージ駆動型アダプタが使用されたキューをリッスンすることから始まります。その後、チャネル ベースのキュー エンドポイントが定義され、各エンドポイントが Service-Activators によって処理されます。

現在、パフォーマンス フェーズに入っています。200 のメッセージ リクエストを生成しています。最初に、メッセージが並列で実行されていないことを確認しました。いくつかの読み取りを行った後、concurrent-consumer および max-concurrent-consumer プロパティを JMS メッセージ駆動型リスナー アダプターに追加すると、マルチスレッド モードを有効にするのに役立つことがわかりました。確かにこれは役に立ちましたが、まだプロセスの途中でシングルスレッド効果が見られます。これは、エンドポイントが定義されている方法によるものですか? 各エンドポイントにキュー容量を追加する利点は何ですか? 各 Channel エンドポイント定義に queue-capacity を追加することで、マルチスレッド モードでの実行に再び役立つと思いますか。

要求された設計スナップショット:

アクションフロー

4

3 に答える 3

0

フロー ダイアグラムを見ると、フローには多くのシングル スレッド要素があり、うまくいけばより高いスループットでより多くの同時処理を行うように最適化できるように見えます。

メッセージ ドリブン チャネル アダプター (その構成は示していません) から始めるには、1 つ以上の既定のコンシューマーを持つように構成でき、消費サイクルごとに適切な数のメッセージを消費するようにすることができます。

メッセージ ドリブン チャネル アダプターを通過すると、メッセージをダイレクト チャネル 1 に入れるスレッドは、他の場所にバッファリングがないため、残念ながら残りのフローを実行します。そのため、メッセージが「ダイレクト チャネル 1」に入れられると、すぐに同じスレッドでルーターを呼び出してから、同じスレッドでサービス アクティベーターとメール アダプターまたは JMS 送信チャネル アダプターを呼び出します。ここでの変更は、直接チャネル 1 の代わりにキュー チャネルを導入することです。このようにして、メッセージを消費しているスレッドはメッセージをキュー チャネルに配置するだけで、それで処理が完了します。

ダイレクト チャネル 1 (キュー チャネル 1 に変更) を超えて、フローの速度に基づいてシングル スレッド化できると思います。たとえば、メール アダプターが遅い場合、ダイレクト チャネル 4 もキュー チャネルにすることができます。ダイレクトチャンネル 5

太字で強調表示されているこれらの変更がフローを改善するのに役立つかどうかを確認してください

于 2012-06-27T01:00:01.560 に答える
0

チャネルの正確な定義を確認すると役立ちます。

デフォルトでは、Spring チャネルは送信者のスレッドでメッセージを消費します。つまり同期です。チャネルでメッセージを非同期的に消費する場合は、TaskExecutor を指定する必要があります。http://static.springsource.org/spring/docs/3.0.5.RELEASE/reference/scheduling.htmlを参照

于 2012-06-26T20:28:24.090 に答える
0

パフォーマンスを向上させるために、スレッドプールサイズの数を制御するタスクエグゼキューターでエグゼキューターチャネルを使用することをお勧めします。このようにして、メッセージが jms キューに到着すると、コンシューマーがメッセージを受け取り、別のスレッドでフローを処理する状況が発生します。この種の構成では、マルチスレッド作業は taskexecutor チャネルによって実行され、別のスレッドでメッセージの受信を実行することを覚えておいてください。このため、必要なマルチスレッドのグレードをよく考えなければなりません。

キューメッセージチャネルの場合、受信を実行するためにチャネルをポーリングするポーラーが必要です。キューの容量は、舞台裏のアトミックキューの容量です。

xml でこの方法でエグゼキューター チャネルを構成できます。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:si="http://www.springframework.org/schema/integration"
       xmlns:tx="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration 
        http://www.springframework.org/schema/integration/spring-integration.xsd 
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

       <tx:executor id="taskExecutor" pool-size="10" queue-capacity="10"/>
       <si:channel id="ch" >
              <si:dispatcher task-executor="taskExecutor"/>       
       </si:channel>
</beans>

またはこのようにjava-dslで

 @Bean
    public IntegrationFlow storeBookPageByPage(ConnectionFactory connectionFactory,
                                               @Qualifier("createBookQueue") Destination createBookQueue,
                                               @Qualifier("createBookResultQueue") Destination createBookResultQueue,
                                               PdfBookMasterRepository pdfBookMasterRepository,
                                               BookRepository bookRepository){
        String tempFilePathBaseDir = environment.getProperty("bookService.storeBookPageByPage.tempFilePathBaseDir");

        return IntegrationFlows.from(Jms.messageDriverChannelAdapter(connectionFactory)
                .destination(createBookQueue)
                .errorChannel(storeBookPageByPageErrorChannel()))
                .channel(channels -> channels.executor(Executors.newScheduledThreadPool(5)))
                .....

        }
于 2016-03-11T22:32:45.723 に答える