2

この質問は 4 か月前に尋ねられました。

https://stackoverflow.com/posts/16241300/edit

誰?

「キューからすべてのメッセージを 5 分ごとに消費するように、ミュール フローでクォーツ コードを作成しました。

<quartz:inbound-endpoint jobName="abc" cronExpression="0 0/1 * * * ?" doc:name="Quartz">
               <quartz:endpoint-polling-job>
                     <quartz:job-endpoint ref="jmsEndPoint" />
               </quartz:endpoint-polling-job>
        </quartz:inbound-endpoint>

ただし、上記のコードは、キューに 5 つのメッセージがある場合でも、一度に 1 つのメッセージしか消費しません。

私の要件は、5 分ごとにジョブを実行し、キューからすべてのメッセージを消費することです。

もう 1 つの要件は、メッセージ ペイロード内の一意の識別子を使用して重複メッセージを除外することです。

どんな助けでも大歓迎です。"

編集: JMS エンドポイント

<jms:endpoint name="jmsEndPoint" queue="MyQueue" connector-ref="connector"/>
4

3 に答える 3

2

キューはイベント ベースであり、メッセージを 1 つだけ返すように設計されています (先入れ先出し)。Mule フローのキューからすべてのメッセージを消費するための 1 つの方法は、メッセージがなくなるまでキューから jms メッセージをプログラムで消費するカスタム コンポーネントを作成することです。

重複メッセージをフィルタリングするには、Mule のべき等ルーターの使用を検討してください。

http://www.mulesoft.org/documentation/display/current/Routing+Message+Processors#RoutingMessageProcessors-IdempotentMessageFilter

HTH

于 2013-09-06T13:45:09.187 に答える
1

コードを見ると、次のように読む必要があるようです。

muleEventContext.requestEvent("MyQueue", -1);

IDでフィルタリングしたい場合は、これを行うことができます:

<idempotent-message-filter idExpression="#[message:id]-#[header:foo]">
    <simple-text-file-store directory="./idempotent"/>
 </idempotent-message-filter>
于 2013-09-12T13:20:49.060 に答える
0

Mule-config xml で:

  <quartz:connector name="quartzConnector">
    <receiver-threading-profile
        maxThreadsActive="1" />
  </quartz:connector>

  <flow name="DelayedMessageProcessing">
    <quartz:inbound-endpoint name="qEP6"
                             cronExpression="${some.cron.expression}"
                             jobName="DelayedProcessing"
                             connector-ref="quartzConnector">
      <jms:transaction action="ALWAYS_BEGIN" />
      <quartz:event-generator-job />
    </quartz:inbound-endpoint

    <component class="com.something.myComponent" />
  </flow>

.. および Java コンポーネント:

package com.something;

import org.mule.api.MuleEventContext;
import org.mule.api.MuleException;
import org.mule.api.MuleMessage;
import org.mule.api.lifecycle.Callable;

public class MyComponent implements Callable {

    public Object onCall(final MuleEventContext muleEventContext) throws Exception {
        MuleMessage delayedMessage = fetchMessage(muleEventContext);

        while (delayedMessage != null) {
            //You might have to copy properties from inbound to outbound scope here..
            muleEventContext.dispatchEvent(delayedMessage, "some.jms.endpoint");
            delayedMessage = fetchMessage(muleEventContext);
        }

        return null;
    }

    private MuleMessage fetchMessage(final MuleEventContext muleEventContext) throws MuleException {
        return muleEventContext.requestEvent("some.delayed.jms.endpoint", 3000);
    }
}
于 2014-12-12T08:03:16.860 に答える