1

外部モジュールは、何千ものメッセージをメッセージ ブローカーに送信します。各メッセージには、5 秒に等しい TimeToLive プロパティがあります。別のモジュールがすべてのメッセージを消費して処理する必要があります。

Spring Integration のドキュメントから、Staged Event-driven アーキテクチャ (コンシューマー) が、負荷の大幅なスパイクによりよく反応することがわかりました。

私の現在の実装では、EDA (ドリブン アーキテクチャでさえも) を使用しています。

<si:channel id="inputChannel"/>

<!-- get messages from PRESENCE_ENGINE queue -->    
<int-jms:message-driven-channel-adapter id="messageDrivenAdapter" 
    channel="inputChannel" destination="sso" connection-factory="connectionFactory"  
    max-concurrent-consumers="1" auto-startup="true" acknowledge="transacted" extract-payload="true"/>

<si:service-activator id ="activatorClient" input-channel="inputChannel" ref="messageService" method="processMessage"/> 

<bean id="messageService" class="com.my.messaging.MessageService"/>

<bean id="sso" 
    class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="SSO" />
</bean> 

大量のメッセージが受信されるなど、明らかに負荷が高いと、processMessage() に 5 秒以上かかる場合があります。また、MessageService はすべてのメッセージを処理できない場合があります。

私のアイデアは次のとおりです。

  1. メッセージが処理されるのではなく、MongoDB にのみ保存されるように processMessage() を変更します。その後、別のタスクでメッセージを個別に処理できました。このようなシナリオでは、MongoDB は CACHE として機能します。

  2. 多数のコンシューマー (SEDA モデル) を使用します。inputChannel は直接チャネルです。

  3. メッセージを非同期的に処理します。たとえば、inputChannel はキュー チャネルであり、メッセージは非同期的に処理されます。

決定を下す前に、どのシナリオがより効果的かお尋ねしたいと思います。おそらく、シナリオ 2) と 3) は、負荷が高くてもすべてのメッセージを処理する必要があるという私の要件を満たすメカニズムを提供します。

編集:

1 秒あたり 1000 メッセージを送信し続けるシナリオ 2 を既に実装しました。これは、さまざまなパラメーターで失われたメッセージの数の統計です。

最大同時消費者; TimeToLive=5 秒; アイドル消費者制限; 送信されたメッセージの数; 受信メッセージ数

 10 ; Yes ; 1   ; 1001 ; 297
100 ; Yes ; 1   ; 1001 ; 861
150 ; Yes ; 1   ; 1001 ; 859
300 ; Yes ; 1   ; 1001 ; 861
300 ; No  ; 1   ; 1001 ; 860
300 ; No  ; 100 ; 1001 ; 1014
300 ; No  ; 50  ; 1001 ; 1011

idle-consumer-limit は max-concurrent consumer よりも積極的に消費者を作成しているようです。これは、そのようなシナリオで idle-consumer-limit を使用する良いアプローチですか?

これは、送信者/消費者の構成ファイルです。

<!-- SENDER  
Keep Alive Sender sends messages to backup server -->    

<si:channel id="sendToChannel"/>
<si:channel id="presChannel"/>

<si:inbound-channel-adapter id="senderEntity" channel="sendToChannel" method="sendMessage"> 
    <bean class="com.ucware.ucpo.sso.cache.CacheSender"/>
    <si:poller fixed-rate="${sender.sendinterval}"></si:poller>
</si:inbound-channel-adapter>    

<si:router id="messageRouter" method="routeMessage" input-channel="sendToChannel">
    <bean class="com.ucware.ucpo.sso.messaging.MessageRouter"/>
</si:router>

<!-- Subscriber to a channel dispatcher, Send messages to JMS -->
<int-jms:outbound-channel-adapter  explicit-qos-enabled="${jms.qos.enabled}" time-to-live="${jms.message.lifetime}" 
    channel="presChannel" connection-factory="connectionFactory" destination="pres" extract-payload="false"/>

<bean id="pres" 
    class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="PRES" />
</bean>


<!-- RECEIVER -->

<si:channel id="receiveChannel"/>

<!-- get messages from PRES queue -->    
<int-jms:message-driven-channel-adapter id="messageDrivenAdapter" 
    channel="receiveChannel" destination="presence" connection-factory="connectionFactory"  idle-consumer-limit="50" 
    max-concurrent-consumers="300" auto-startup="true" acknowledge="transacted" extract-payload="true"/>

<si:service-activator id ="activatorClient" input-channel="receiveChannel" ref="messageService" method="processMessage"/> 


<bean id="messageService" class="com.cache.MessageService"/>
4

1 に答える 1

3

まず、max-concurrent-consumersプロパティで遊んでみることができます。ご覧のとおり、あなたの場合1は本当に十分ではありません。MessageService速度が遅い理由を調査する必要があります。JMS はすでに永続的であり、非同期の性質 (キューベース) を持っているため、他のケースはオーバーヘッドのように見えます。それが役に立たない場合は、mongoDB などの presistence でチャネルを使用し<queue>てくださいMessageStore

于 2013-11-14T10:42:05.607 に答える