4

AMQP ブローカーにアタッチされた Spring 統合アプリがあります。

amqp-queue からメッセージを受信し、db レコードを更新したいと考えています。

パフォーマンスを向上させるために、複数の更新を同時に実行できるワーカーのプールがあります。

私は次の構成を持っています:

<int-amqp:inbound-channel-adapter queue-names="pricehub.fixtures.priceUpdates.queue" 
                            channel="pricehub.fixtures.priceUpdates.channel"
                            message-converter="jsonMessageConverter"/>

<int:channel id="pricehub.fixtures.priceUpdates.channel">
    <int:queue  />
</int:channel>

<int:service-activator ref="updatePriceAction" 
     method="updatePrices" 
     input-channel="pricehub.instruments.priceUpdates.channel">
    <int:poller fixed-delay="50" time-unit="MILLISECONDS" task-executor="taskExecutor" />
</int:service-activator>

<task:executor id="taskExecutor" pool-size="5-50" keep-alive="120" queue-capacity="500"/>

AMQP チャネルで処理する受信メッセージがない状態でこの実行を開始すると、すぐにスレッドプールが使い果たされ、拒否が開始されることがわかります。

ログは次のとおりです。

[Thu Apr 2013 23:41:51.153] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-w4qPp60jVEQOIEovR4cERv], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0
[Thu Apr 2013 23:41:51.160] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-Q3Lq4R9g9E8WBNVLYzaFmq], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), acknowledgeMode=AUTO local queue size=0
[Thu Apr 2013 23:41:51.166] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-w8bg7ltEV2mot8QXDPCmfK], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), acknowledgeMode=AUTO local queue size=0
[Thu Apr 2013 23:41:51.170] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-A-0KdqhFjpc-Hvjmv7aZAc], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,4), acknowledgeMode=AUTO local queue size=0
[Thu Apr 2013 23:41:51.180] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
[Thu Apr 2013 23:41:51.180] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
[Thu Apr 2013 23:41:51.199] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
[Thu Apr 2013 23:41:51.200] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
[Thu Apr 2013 23:41:51.220] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'

すぐに、スレッド プールは実行を拒否し始めます。

[Thu Apr 2013 23:47:15.363] ERROR [] (org.springframework.integration.handler.LoggingHandler:126) - org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@6ff3cb0e] did not accept task: org.springframework.integration.util.ErrorHandlingTaskExecutor$1@78615c8b
    at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:244)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:231)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:53)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:680)
Caused by: java.util.concurrent.RejectedExecutionException
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1768)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658)
    at org.springframework.sched

uling.concurrent.ThreadPoolTask​​Executor.execute(ThreadPoolTask​​Executor.java:241) ... 12 もっと見る

犯人はここにあると思われます: BlockingQueueConsumer- メッセージの各ポーリングがメッセージが到着するまでスレッドをブロックすることを示しています...スレッドプールがすぐに使い果たされます。

これを構成する正しい方法は何ですか?

4

2 に答える 2

7

QueueChanneland pollerを使用するのではなく、単純concurrent-consumersにインバウンド アダプターの属性を増やしてみませんか?

    <xsd:attribute name="concurrent-consumers" type="xsd:string">
        <xsd:annotation>
            <xsd:documentation>
Specify the number of concurrent consumers to create. Default is 1.
Raising the number of concurrent consumers is recommended in order to scale the consumption of messages coming in
from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In
general, stick with 1 consumer for low-volume queues.
            </xsd:documentation>
        </xsd:annotation>
    </xsd:attribute>

そして、 と を削除し<queue/>ます<poller/>

また、ログにスレッド名を含めることを常にお勧めします ( %tlog4J の場合)。スレッド化の問題のデバッグが容易になります。

編集:

ポーラーを使用すると、スレッドが不足する理由は、ポーラーのデフォルトreceive-timeoutが 1 秒であるためです。50 ミリ秒ごとにスレッドをスケジュールしていますが、それぞれがQueueChannelで 1 秒間待機します。最終的に、タスク キューがいっぱいになります。

これを回避するには、単にreceive-timeoutto を0on に設定して<poller/>、この手法を続行したい場合 - ただし、アダプターでより高い同時実行性を使用すると、ポーリングや別のスレッドへのハンドオーバーがないため、より効率的になります。

于 2013-04-04T13:47:31.877 に答える
0

amqp-inbound キュー (pub/sub スタイルのキュー) と queue-channel の間をマッピングするためのブリッジが必要だったようです。

<int-amqp:inbound-channel-adapter queue-names="pricehub.fixtures.priceUpdates.queue" 
                            channel="pricehub.fixtures.priceUpdates.subpub"
                            message-converter="jsonMessageConverter"/>

<int:publish-subscribe-channel id="pricehub.fixtures.priceUpdates.subpub" />
<int:bridge input-channel="pricehub.fixtures.priceUpdates.subpub" 
    output-channel="pricehub.fixtures.priceUpdates.channel" />

<int:channel id="pricehub.fixtures.priceUpdates.channel">
    <int:queue  />
</int:channel>

<int:service-activator ref="updatePriceAction" 
     method="updatePrices" 
     input-channel="pricehub.instruments.priceUpdates.channel">
    <int:poller fixed-delay="50" time-unit="MILLISECONDS" task-executor="taskExecutor" />
</int:service-activator>

<task:executor id="taskExecutor" pool-size="5-50" keep-alive="120" queue-capacity="500"/>

これは、かなり些細なタスクを達成するための大量のコードのように思えます。そのため、より良い解決策や改善のための提案があれば、ぜひ見てみたいと思います。

于 2013-04-04T13:29:41.003 に答える