spring-rabbit-1.3.9.RELEASE ライブラリを使用して Rabbitmq 3.3.5 の POC を実行しているときに、奇妙な動作を観察しています。
単一の生成スレッドを開始すると、物事はスムーズに実行されます。ただし、複数のスレッドを同時に開始すると、そのうちの 1 つだけが終了し、他のすべてのスレッドは、キューが空になった後でも無期限にブロックされます。
から監視すると、ブロックされたスレッドの接続のステータスは実行中のままになりますrabbitmqctl list_connections
。プロデューサがブロックするとき、または完全な実行中のその他の時点では、アラームは発生しないことに注意してください。
また、各送信後に 1 ミリ秒スリープ状態にすると、問題が解消されることも確認しました。
だから、私はこれらの質問があります
- rabbitmq は並行プロデューサーをサポートしておらず、高いレートで公開していますか?
- 接続が実際にブロックされている場合でも、rabbitmqctl list_connections に表示されないのはなぜですか?
- なぜ彼らは無期限にブロックし、ホエイ キューが空になると回復しないのですか?
コード
public static void main(String[] argv) throws java.io.IOException, InterruptedException {
init();
PocConfig config = new PocConfig();
int threadCount = config.getThreadCount();
final int eventsPerThread = config.getEvents() / threadCount;
final long sleep = config.getSleep();
System.out.println("Start producer with configuration [threadCount=" + threadCount + ", events=" + eventsPerThread + ", sleep="
+ sleep + "]");
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
executorService.submit(new Runnable() {
public void run() {
produce(eventsPerThread, sleep, threadId);
}
});
}
waitAndTearDown(executorService);
}
private static void produce(int events, long sleep, int threadId) {
long start = System.currentTimeMillis();
for (int index = 1; index <= events; index++) {
try {
byte[] message = messageFactory.createTestMessage(index);
amqpTemplate.convertAndSend(QUEUE_NAME, message);
if (sleep > 0) {
Thread.sleep(sleep);
}
} catch (Exception e) {
LOG.error("Error", e);
}
}
long time = System.currentTimeMillis() - start;
System.out.println("Producer:" + threadId + " finished, events: " + events + ", Time(s): " + time / 1000 + ", tps: " + (events * 1000) / time);
}
春の構成
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="addresses" value="${addresses}" />
<property name="username" value="${user}" />
<property name="password" value="${passwd}" />
<property name="cacheMode" value="CONNECTION" />
<property name="connectionCacheSize" value="${threads}" />
<property name="channelCacheSize" value="10" />
</bean>
<rabbit:template id="template" connection-factory="connectionFactory"
exchange="testExchange" routing-key="testQueue"/>