3

これは私の現在の春のamqp構成です

<rabbit:connection-factory id="rabbitConnectionFactory"
    port="${rabbitmq.port}" host="${rabbitmq.host}" username="${rabbitmq.username}" password="${rabbitmq.password}"/>

<rabbit:admin id="rabbitmqAdmin" connection-factory="rabbitConnectionFactory" />

<rabbit:template id="importAmqpTemplate"
    connection-factory="rabbitConnectionFactory">
</rabbit:template>

これは私の交換、キュー、リスナー、replyQueues、replyHandlers の構成です。

<rabbit:queue name="${process1.queue}" />
<rabbit:queue name="${process1.reply.queue}" />

<rabbit:queue name="${process2.queue}" />
<rabbit:queue name="${process2.reply.queue}" />

<rabbit:direct-exchange name="${myExchange}">
    <rabbit:bindings>
        <rabbit:binding queue="${process1.queue}"
            key="${process1.routing.key}" />
        <rabbit:binding queue="${process2.queue}"
            key="${process2.routing.key}" />
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="${my.listener.concurrency}"
    requeue-rejected="false">
    <rabbit:listener queues="${process1.queue}"
        ref="foundation" method="process1" />
    <rabbit:listener queues="${process2.queue}"
        ref="foundation" method="process2s" />
</rabbit:listener-container>


<beans:beans profile="master">

    <beans:bean id="process1Lbq" class="java.util.concurrent.LinkedBlockingQueue" />
    <beans:bean id="process2Lbq" class="java.util.concurrent.LinkedBlockingQueue" />

    <beans:bean id="process1sReplyHandler"
        class="com.stockopedia.batch.foundation.ReplyHandler"
        p:blockingQueue-ref="process1Lbq" />

    <beans:bean id="process2ReplyHandler"
        class="com.stockopedia.batch.foundation.ReplyHandler"
        p:blockingQueue-ref="process2Lbq" />

    <rabbit:listener-container
        connection-factory="rabbitConnectionFactory" concurrency="1"
        requeue-rejected="false">
        <rabbit:listener queues="${process1.reply.queue}"
            ref="process1sHandler" method="onMessage" />
        <rabbit:listener queues="${process2.reply.queue}"
            ref="process2ReplyHandler" method="onMessage" />
    </rabbit:listener-container>

</beans:beans>

これを6つの異なるサーバーに設定し、マスターサーバーからのメッセージのみをキューに入れました. 他のサーバーはメッセージのみを処理しています。すべてのサーバーには、同時実行で設定された数のリスナーが実行されています。

問題は、メッセージの処理に異なる時間がかかることです。一部のメッセージには時間がかかります。そのため、現在、一部のサーバーは、それらのサーバー上のすべてのリスナーがメッセージの処理を完了していても、キューからメッセージを取得しません。

保留中のメッセージが処理されるキューにあり、一部のサーバーがアイドル状態になっていることがわかります。他のサーバーがメッセージの処理でビジー状態になっている間に、それらのサーバーが残りのメッセージを取得するようにします。

チュートリアルhttp://www.rabbitmq.com/tutorials/tutorial-two-java.html (Fair Dispatch)に記載されているように、basic_Quos を設定する必要がありますか?

int prefetchCount = 1;
channel.basicQos(prefetchCount);

それともSpring ampqのデフォルトですか?そうでない場合、どうすればいいですか?

4

1 に答える 1

1

basicQos(1)リスナー コンテナのデフォルト設定です。prefetchコンテナに設定することで変更できます。

保留中のメッセージが処理されるキューにあり、一部のサーバーがアイドル状態になっていることがわかります。

アイドル状態のコンシューマーがいる場合、メッセージがキューに留まっているだけでは表示されません。メッセージが未確認としてマークされている場合、メッセージは処理中です。

DEBUG レベルのロギングをオンにすると、アイドル状態のコンシューマーが新しい配信のために内部キューをポーリングしているのを確認できます。

于 2014-05-19T12:42:27.243 に答える