6

キャメルで交換内容に基づいたスロットルが可能かどうか知りたいのですが。

状況は次のとおりです。soapを介してWebサービスを呼び出す必要があります。そのWebサービスに送信されるパラメーターの中にはcustomerIdがあります。問題は、特定のcustomerIdに対して1分あたり1つを超えるリクエストがある場合、Webサービスがエラーを送り返すことです。

Camelを使用してcustomerIdごとにスロットリングを実装できるかどうか疑問に思っています。したがって、スロットリングはすべてのメッセージに実装するのではなく、同じcustomerIdを持つメッセージにのみ実装する必要があります。

これをどのように実装できるか、または質問を明確にする必要があるかどうかを教えてください。

4

3 に答える 3

2

ActiveMQメッセージグループは、このケースを処理するように設計されています。したがって、ルートにJMSキューホップを導入できる場合は、JMSXGroupIdヘッダーをcustomerIdに設定するだけです。次に、別のルートで、このキューから消費し、Webサービスに送信して、説明した動作を取得できます。

詳細については、 http://camel.apache.org/parallel-processing-and-ordering.htmlも参照してください...

于 2012-09-11T15:31:19.630 に答える
2

ActiveMQメッセージグループは一意の顧客IDの並列処理に確実に対応しますが、私の評価では、一意のグループごとにスロットルを導入することは、Camel/ActiveMQの未実装の機能を表すということは正しいです。

メッセージグループだけでは、説明されているSLAを満たしません。メッセージの各グループ(顧客IDによって関連付けられている)は、グループごとに1つのスレッドで順番に処理されますが、要求が応答を受信するのに1分もかからない限り、顧客ごとに1分あたり1つの要求の要件は適用されません。 。

そうは言っても、JIRAの機能要求をシミュレートする方法で、メッセージグループとスロットル戦略を組み合わせることが可能かどうかを知りたいと思います。これまでの私の試みは失敗しました。私はこれらの線に沿って何かを考えていました:

<route>
  <from uri="activemq:pending?maxConcurrentConsumers=10"/>
  <throttle timePeriodMillis="60000">
    <constant>1</constant>
    <to uri="mock:endpoint"/>
  </throttle>
</route>

ただし、スロットルは、個々のコンシューマーではなく、エンドポイントに移動する一連のリクエスト全体に適用されるようです。私は認めなければなりません、私はその振る舞いを見つけて少し驚いた。メッセージにJMSXGroupIdヘッダーに顧客IDが含まれている場合、スロットルは各コンシューマーに個別に適用され、元の質問のSLAを満たします。

于 2012-12-21T15:42:08.513 に答える
2

私は同様の問題に遭遇し、最終的にここで説明する解決策を思いつきました。

私の仮定は次のとおりです。

  • メッセージの順序は重要ではありません(ただし、リシーケンサーで解決できます)
  • 顧客IDあたりのメッセージの総量は多くないため、実行時間は飽和していません。

ソリューションアプローチ:

  • customerIDを使用して同じ顧客IDのメッセージをリストにアセンブルしながら、アグリゲーターを1分間実行します
  • Splitterを使用して、リストを個々のメッセージに分割します
  • スプリッターから実際のサービスに最初のメッセージを送信します
  • リストの残りの部分をアグリゲーターに再ルーティングします。

JavaDSLバージョンは少し理解しやすいです:

final AggregationStrategy aggregationStrategy = AggregationStrategies.flexible(Object.class)
        .accumulateInCollection(ArrayList.class);

from("direct:start")
    .log("Receiving ${body}")
    .aggregate(header("customerID"), aggregationStrategy).completionTimeout(60000)
        .log("Aggregate: releasing ${body}")
        .split(body())
        .choice()
            .when(header(Exchange.SPLIT_INDEX).isEqualTo(0))
                .log("*** Processing: ${body}")
                .to("mock:result")
            .otherwise()
              .to("seda:delay")
        .endChoice();

from("seda:delay")
    .delay(0)
    .to("direct:start");

SpringXMLバージョンは次のようになります。

 <!-- this is our aggregation strategy defined as a spring bean -->
 <!-- see http://stackoverflow.com/questions/27404726/how-does-one-set-the-pick-expression-for-apache-camels-flexibleaggregationstr -->
 <bean id="_flexible0" class="org.apache.camel.util.toolbox.FlexibleAggregationStrategy"/>
 <bean id="_flexible2" factory-bean="_flexible0" factory-method="accumulateInCollection">
     <constructor-arg value="java.util.ArrayList" />
 </bean>

<camelContext xmlns="http://camel.apache.org/schema/spring">
       <route>
           <from uri="direct:start"/>
           <log message="Receiving ${body}"/>
           <aggregate strategyRef="_flexible2" completionTimeout="60000" >
               <correlationExpression>
                   <xpath>/order/@customerID</xpath>
               </correlationExpression>
               <log message="Aggregate: releasing ${body}"/>
               <split>
                   <simple>${body}</simple>
                   <choice>
                       <when>
                           <simple>${header.CamelSplitIndex} == 0</simple>
                           <log message="*** Processing: ${body}"/>
                           <to uri="mock:result"/>
                       </when>
                       <otherwise>
                           <log message="--- Delaying: ${body}"/>
                           <to uri="seda:delay" />
                       </otherwise>
                   </choice>
               </split>
           </aggregate>
       </route>

       <route>
           <from uri="seda:delay"/>
           <to uri="direct:start"/>
       </route>
</camelContext>
于 2016-12-21T02:21:25.757 に答える