2

私は初心者ですが、良心的になろうと思います。

{入力キュー}->[受信-ゲートウェイ-1]-->[ルーター]-------->(アクティベーター)<---------------
                                        \ /
                                         \-->{HOLD QUEUE}--->[受信ゲートウェイ-2]--^

前者のようなフローでルーティング条件を動的に変更しなければならないシナリオがあります。キューからのメッセージは、アクティベーターに送信されて処理されるか、別のキューに送信されて保留されます。ある時点で、新しいメッセージがフローに入らないように INBOUND-GATEWAY-1 を閉じ、HOLD QUEUE からのすべてのメッセージが処理されるように INBOUND-GATEWAY-2 を開く必要があります。HOLD QUEUE からのすべてのメッセージが消費されたら、両方のゲートウェイを以前と同じように閉じたり開いたりする必要があります。ここでの問題は、HOLD QUEUE が空であることをどのように知ることができるので、gateway-1 を開始できるメソッドをトリガーできるかということです。

誰かが私を助けてくれたらありがたいです。

前もって感謝します

4

2 に答える 2

1

After some debugging and reading, finally I came to a solution for this issue. An inbound-gateway is a JmsMessageDrivenEndpoint, based in two inner components, a MessageListenerContainer and a MessageListener. MessageListenerContainer is the one in charge at scheduling MessageListener behaviour so, overriding the noMessageReceived and messageReceived, and adding some attributes to control the desired behaviour, I could be able to do the "magic".

My MessageListenerContainer implementation got like this.

public class ControlMessageListenerContainer extends DefaultMessageListenerContainer{

    private JmsMessageDrivenEndpoint mainInputGateway;

    private long timeOut;

    private long lastTimeReceived;  

    public PassControlMessageListenerContainer() {
        this.setAutoStartup(false);
    }

    @Override
    public void start() throws JmsException {
        /*When the container is started the lastTimeReceived is set to actial time*/
        lastTimeReceived = (new Date()).getTime();
        super.start();
    }

    @Override
    protected void noMessageReceived(Object invoker, Session session) {
        long actualTime = (new Date()).getTime();

        if((actualTime - lastTimeReceived) >= timeOut 
                && mainInputGateway != null && !mainInputGateway.isRunning()){
            mainInputGateway.start();
        }       
        super.noMessageReceived(invoker, session);
    }

    @Override
    protected void messageReceived(Object invoker, Session session) {
        /*lastTimeReceived is set again to actual time at new message arrive*/
        lastTimeReceived = (new Date()).getTime();
        super.messageReceived(invoker, session);
    }
}

And finally, the spring bean config get like this:

<bean id="listenerContainer" 
    class="org.merol.ControlMessageListenerContainer">
    <property name="mainInputGateway" ref="mainGateway" />
    <property name="destination" ref="onHoldQueue" />
    <property name="timeOut" value="10000"/>
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>

<bean id="messageListener" 
    class="org.springframework.integration.jms.ChannelPublishingJmsMessageListener">
    <property name="requestChannel" ref="outputChannel" />
</bean>

<bean id="inboundGateway" 
    class="org.springframework.integration.jms.JmsMessageDrivenEndpoint">
    <constructor-arg name="listenerContainer" ref="listenerContainer" />
    <constructor-arg name="listener" ref="messageListener" />
</bean>

Hope this could be helpful for someone else.

Thanks to @Nicholas for the clues.

于 2011-05-24T18:17:31.100 に答える
0

この機能をインバウンド ゲートウェイ プロセッサに組み込みます。例えば:

Gateway1Processor:

  • start() : コンシューマーをメイン キューとプロセスから開始します。
  • stop() : コンシューマーを停止します。

Gateway2Processor:

  • start() : コンシューマーを HOLD キューから開始します。適切なタイムアウトを指定してください。タイムアウトが発生したら (HOLD キューが空になると)、stop()を呼び出します。
  • stop() : Gateway1Processorを開始し、このコンシューマーを停止します。

したがって、操作シーケンスは次のようになります。

  1. Gateway1Processor の開始
  2. 特定の時間に、 Gateway1Processor.stop ()およびGateway2Processor.start( ) を呼び出します
  3. Gateway2Processorは HOLD キューを排出し、Gateway1Processorを再起動してから停止します。
  4. #2に進みます。
于 2011-05-17T13:29:04.873 に答える