1

ポーラーを使用してチャネルからメッセージをプルするサービスアクティベーターがあります。チャネルには、データベースへの永続ストアによってサポートされるキューがあります。ポーラーは、チャネルからのメッセージ処理にある程度の同時実行性を追加するために、タスク エグゼキューターでも構成されます。

task-executor は queue-capacity で構成されます。

ポーラーはデータベースからチャネルからメッセージを取得し、これはトランザクション対応になるように構成されているため、タスク エグゼキューターで使用可能なスレッドがなくなった場合、タスク エグゼキューターでキューに入れられるメッセージのトランザクションはどうなりますか。スレッドに対する task-executor の要求はキューに入れられます。これらのメッセージには独自のスレッドがないため、トランザクションはどうなりますか? タスク エグゼキューターによってキューに入れられたポーラーによる永続的なチャネル ストアからのメッセージの削除がコミットされると仮定します。では、サーバーがタスク エグゼキューターでキューに入れられたランナブルをキューに入れている間にサーバーに障害が発生した場合、それらは失われますか?

トランザクションの永続的なチャネル キューの考え方は、サーバーがダウンした場合にメッセージが失われないようにすることなので、キューに入れられたメッセージ (タスク エグゼキュータ内) は、チャネル データベースでバックアップされた queue/store でアクティブなトランザクションに関してどのように処理されますか?

    <bean id="store" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="channelServerDataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
    <property name="region" value="${user.name}_${channel.queue.region:default}"/>
    <property name="usingIdCache" value="false"/>
</bean> 

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/> 
</int:transaction-synchronization-factory>

<int:channel id="transacitonAsyncServiceQueue">
    <int:queue message-store="store"/> 
    <!--  <int:queue/>  --> 
</int:channel>

<bean id="rxPollingTrigger" class="org.springframework.scheduling.support.PeriodicTrigger">
    <constructor-arg value="500"/>
    <constructor-arg value="MILLISECONDS"/>
    <property name = "initialDelay" value = "30000"/> 
    <!-- initialDelay important to ensure channel doesnt start processing before the datasources have been initialised becuase we
         now persist transactions in the queue, at startup (restart) there maybe some ready to go which get processed before the
         connection pools have been created which happens when the servlet is first hit -->
</bean> 

<int:service-activator ref="asyncChannelReceiver" method="processMessage" input-channel="transacitonAsyncServiceQueue">
    <int:poller trigger="rxPollingTrigger" max-messages-per-poll="20"  task-executor="taskExecutor" receive-timeout="400">
        <int:transactional transaction-manager="transactionManagerAsyncChannel" /> 
    </int:poller>
    <int:request-handler-advice-chain>
        <ref bean="databaseSessionContext" />
    </int:request-handler-advice-chain>
</int:service-activator>

<task:executor id="taskExecutor" pool-size="100-200" queue-capacity="200" keep-alive="1" rejection-policy="CALLER_RUNS" />  
4

1 に答える 1

0

...次に、タスク エグゼキューターに使用可能なスレッドがなくなった場合、タスク エグゼキューターでキューに入れられるメッセージのトランザクションはどうなりますか。スレッドに対する task-executor の要求はキューに入れられます。これらのメッセージには独自のスレッドがないため、トランザクションはどうなりますか?

そのようには機能しません-タスクが実行されるまでトランザクションは開始されません(receive()チャネルからのものを含む)。

ポーラーは、タスクのスケジュール、タスクの開始、トランザクションの開始、作業の進行、トランザクションのコミット/ロールバックを行います。

AbstractPollingEndpointを参照してください -pollingTask.call()トランザクションが開始される場所です。

于 2015-04-23T07:56:26.540 に答える