1

spring-rabbit-1.3.9.RELEASE ライブラリを使用して Rabbitmq 3.3.5 の POC を実行しているときに、奇妙な動作を観察しています。

単一の生成スレッドを開始すると、物事はスムーズに実行されます。ただし、複数のスレッドを同時に開始すると、そのうちの 1 つだけが終了し、他のすべてのスレッドは、キューが空になった後でも無期限にブロックされます。

から監視すると、ブロックされたスレッドの接続のステータスは実行中のままになりますrabbitmqctl list_connections。プロデューサがブロックするとき、または完全な実行中のその他の時点では、アラームは発生しないことに注意してください。

また、各送信後に 1 ミリ秒スリープ状態にすると、問題が解消されることも確認しました。

だから、私はこれらの質問があります

  1. rabbitmq は並行プロデューサーをサポートしておらず、高いレートで公開していますか?
  2. 接続が実際にブロックされている場合でも、rabbitmqctl list_connections に表示されないのはなぜですか?
  3. なぜ彼らは無期限にブロックし、ホエイ キューが空になると回復しないのですか?

コード

    public static void main(String[] argv) throws java.io.IOException, InterruptedException {
        init();
        PocConfig config = new PocConfig();
        int threadCount = config.getThreadCount();
        final int eventsPerThread = config.getEvents() / threadCount;
        final long sleep = config.getSleep();

        System.out.println("Start producer with configuration [threadCount=" + threadCount + ", events=" + eventsPerThread + ", sleep="
            + sleep + "]");

        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        for (int i = 0; i < threadCount; i++) {
            final int threadId = i;
            executorService.submit(new Runnable() {
                public void run() {
                    produce(eventsPerThread, sleep, threadId);
                }
            });
        }
        waitAndTearDown(executorService);
    }

    private static void produce(int events, long sleep, int threadId)     {
        long start = System.currentTimeMillis();
        for (int index = 1; index <= events; index++) {
            try {
                byte[] message = messageFactory.createTestMessage(index);
                amqpTemplate.convertAndSend(QUEUE_NAME, message);
                if (sleep > 0) {
                    Thread.sleep(sleep);
                }
            } catch (Exception e) {
                LOG.error("Error", e);
            }
        }
        long time = System.currentTimeMillis() - start;
        System.out.println("Producer:" + threadId + " finished, events: " + events + ", Time(s): " + time / 1000 + ", tps: " + (events * 1000) / time);
    }

春の構成

<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <property name="addresses" value="${addresses}" />
    <property name="username" value="${user}" />
    <property name="password" value="${passwd}" />
    <property name="cacheMode" value="CONNECTION" />
    <property name="connectionCacheSize" value="${threads}" />
    <property name="channelCacheSize" value="10" />
</bean>

<rabbit:template id="template" connection-factory="connectionFactory"
    exchange="testExchange" routing-key="testQueue"/>
4

1 に答える 1

2

それがブロックするとは考えられないので、テストを実行しました。問題はありませんでした:

Start producer with configuration [threadCount=5, events=10, sleep=0]
Producer:2 finished, events: 1000, Time(s): 0, tps: 4405
Producer:3 finished, events: 1000, Time(s): 0, tps: 4132
Producer:1 finished, events: 1000, Time(s): 0, tps: 4048
Producer:0 finished, events: 1000, Time(s): 0, tps: 3968
Producer:4 finished, events: 1000, Time(s): 0, tps: 3952

彼らがブロックされていると思う理由は何ですか?

スレッド ダンプを (jstack などを使用して) 取得し、スレッドが何を行っているかを確認します。

編集

100 万通のメッセージがあっても、まだ再現できませんCacheMode CONNECTION

Start producer with configuration [threadCount=5, events=200000, sleep=0]
Producer:0 finished, events: 200000, Time(s): 50, tps: 3959
Producer:3 finished, events: 200000, Time(s): 53, tps: 3746
Producer:1 finished, events: 200000, Time(s): 55, tps: 3635
Producer:2 finished, events: 200000, Time(s): 55, tps: 3634
Producer:4 finished, events: 200000, Time(s): 55, tps: 3629

キューが(管理者 UI を介して)モードになるのは確認できますflowが、すべて正常に回復します。

あなたのワーカーがフロー制御下にあることがわかります...

"pool-2-thread-3" prio=10 tid=0x00007f4af4849800 nid=0x65d5 runnable [0x00007f4ae082f000]
 java.lang.Thread.State: RUNNABLE
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)

うさぎのログに何かありますか?メッセージ レート、ステータスなどに関して、管理 UI には何が表示されますか?

とにかく、これは Spring AMQP とは何の関係もないようです。rabbitmq-usersGoogle グループの rabbitmq の人たちに連絡する必要があります。

(私はrabbitmq 3.4.2でテストしていました)。

EDIT2:

3.5.2 の完全なクリーン インストールで...

Start producer with configuration [threadCount=5, events=200000, sleep=0]
Producer:0 finished, events: 200000, Time(s): 39, tps: 5091
Producer:1 finished, events: 200000, Time(s): 39, tps: 5002
Producer:2 finished, events: 200000, Time(s): 40, tps: 4954
Producer:3 finished, events: 200000, Time(s): 40, tps: 4951
Producer:4 finished, events: 200000, Time(s): 40, tps: 4939

そしてflow、管理UIに状態が表示されませんでした(キュー上ですが、チャネル/接続はそれらが流れていることを示していますが、再び回復しました)。

于 2015-05-30T12:00:37.823 に答える