2

JavaでRabbitMQを利用してかなり基本的なアプリケーションを実行しようとしています。Javaを使用してメッセージを同時に消費したいのですがExecutorService。私のプロジェクトはSpringを使用してThreadPoolExecutorFactoryBeanいるので、次のように定義しました。

<bean class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean"
        destroy-method="destroy">
    <property name="corePoolSize" value="8"/>   
    <property name="keepAliveSeconds" value="600"/>
    <property name="maxPoolSize" value="16"/>
    <property name="threadGroupName" value="CallbackQueue-Group"/>
    <property name="threadNamePrefix" value="CallbackQueue-Worker-"/>
</bean>

このBeanを、次のようなことをしているクラスを利用してメインのメッセージキューに注入しています。

this.connection = getConnectionFactory().newConnection(getQueueExecutor());
this.channel = this.connection.createChannel();
this.channel.queueDeclare(getQueueName(), true, false, false, null);
this.channel.basicConsume(getQueueName(), false, new DefaultConsumer(this.channel) {
    @Override public void handleDelivery(String consumerTag, Envelope envelope, 
            BasicProperties properties, byte[] body) throws IOException {
        logger.debug("Received message {}", properties.getCorrelationId());

        try { Thread.sleep(3000); } catch (InterruptedException e) {};

        getChannel().basicAck(envelope.getDeliveryTag(), false);
    }
});

簡単に言えば、キュ​​ーに複数のメッセージを投稿すると、タスクの実行に時間がかかる場合でも、ログステートメントがかなり接近して発生することがわかります。ただし、 !にもかかわらず、コンシューマーが一度に1つのタスクしか処理していないことがわかります。ExecutorServiceさらに奇妙なのは、プールサービスのさまざまなスレッドがキューに表示されていることです。ただし、同時に発生することはありません。

12:43:40.650 [CallbackQueue-Worker-2] DEBUG MyApplication - Received message 65bfbba29b4965eb0674c082c73dad7c
12:43:43.737 [CallbackQueue-Worker-3] DEBUG MyApplication - Received message 2a0b29012b13857c5a0ae8060f66dbaa
12:43:46.755 [CallbackQueue-Worker-3] DEBUG MyApplication - Received message 3c0742f9a284ac9c6b602200254c70db
12:43:49.769 [CallbackQueue-Worker-3] DEBUG MyApplication - Received message a462236fab19d51ba4bfea1582410a64
12:43:52.783 [CallbackQueue-Worker-3] DEBUG MyApplication - Received message 1a4713e1066dfc9e4ec1302098450a1f

私はここで何が間違っているのですか?ThreadPoolExecutorFactoryBean私のまたは私のRabbitMQコードのいずれかで見逃した追加の構成はありますか?

4

1 に答える 1

1

com.rabbitmq.client.Channel の説明から:

Channel は複数のスレッドで使用できますが、一度に 1 つのスレッドだけがコマンドを実行するようにすることが重要です。コマンドを同時に実行すると、UnexpectedFrameError がスローされる可能性があります。

これが理由でしょうか?ログは、異なるワーカーが使用されていることを示しています (2 と 3 が表示されます) が、一度に 1 つのワーカーのみです。

于 2012-07-26T20:22:33.917 に答える