10

キューを宣言する次のコードがあります。

Connection connection = RabbitConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(getQueueName(), false, false, false, null);
consumer = new QueueingConsumer(channel);
channel.basicConsume(getQueueName(), true,consumer);

次の Delivery オブジェクトを取得して処理するには、次のようにします。

    Delivery delivery = null;
    T queue = null;

    //loop over, continuously retrieving messages
    while(true) {

        try {
            delivery = consumer.nextDelivery();
            queue = deserialise(delivery.getBody());

            process(queue);

        } catch (ShutdownSignalException e) {
            logger.warn("Shutodwon signal received.");
            break;
        } catch (ConsumerCancelledException e) {
            logger.warn("Consumer cancelled exception: {}",e.getMessage());
            break;
        } catch (InterruptedException e) {
            logger.warn("Interuption exception: {}", e);
            break;
        }
    }

デシリアライズ コード。ご覧のとおり、私は Kryo を使用しています。

public T deserialise(byte[] body) {
    Kryo kryo= new Kryo();
    Input input = new Input(body);
    T deserialised = kryo.readObject(input, getQueueClass());
    input.close();

    return deserialised;
}

多数のオブジェクトを含むキューでこれを実行すると、約 270 万個のオブジェクトの後、メモリ不足の例外が発生します。これはもともと、JMeter から約 90/s の速度でデータを一晩実行して見つけたもので、最初は問題なく消費されていましたが、朝、RabbitMQ に多数のデータがあり、メモリ不足の例外が発生していることに気付きました消費者。もう一度実行し、Eclipse メモリ アナライザーを使用して、このメモリがどこで使用されているかを特定しました。このことから、com.rabbitmq.client.QueueingConsumer によって参照される java.util.concurrent.LinkedBlockingQueue がメモリ不足になるまで成長し続けていることがわかります。

リソースを解放するように Rabbit に指示するために何かする必要がありますか?

ヒープ サイズを増やすことはできますが、これは短期的な修正にすぎないのではないかと心配しています。実稼働展開の数か月後にコードにメモリ リークが発生する可能性があります。

4

4 に答える 4

7

私の間違いは、チャンネルを自動確認に設定していたことです。これは、Rabbit からのすべてのメッセージが受信確認されたことを意味します。チャネルを自動確認しないように宣言することでこれを修正 (およびテスト) し、 channel.basicConsume(getQueueName(), false,consumer);キューを処理した後、メッセージを確認します: consumer.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);

これは、私のキュー宣言が次のようになったものです。

        Connection connection = RabbitConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(getQueueName(), false, false, false, null);
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(getQueueName(), false,consumer);

およびキューを処理するための次のとおりです。

    Delivery delivery = null;
    T queue = null;

    //loop over, continuously retrieving messages
    while(true) {

        try {
            delivery = consumer.nextDelivery();
            queue = deserialise(delivery.getBody());
            process(queue);
            consumer.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);

        } catch (ShutdownSignalException e) {
            logger.warn("Shutodwon signal received.");
            break;
        } catch (ConsumerCancelledException e) {
            logger.warn("Consumer cancelled exception: {}",e.getMessage());
            break;
        } catch (InterruptedException e) {
            logger.warn("Interuption exception: {}", e);
            break;
        } catch (IOException e) {
            logger.error("Could not ack message: {}",e);
            break;
        }
    }

RabbitMQ の管理画面で、メッセージが非常に高い速度で配信されているのに、その速度で確認応答されていないことがわかります。その後、コンシューマーを強制終了すると、約 30 秒以内に、ACK されていないメッセージがすべて Ready キューに戻されます。私が行う改善の 1 つは、basicQos 値を設定することです。 channel.basicQos(10);これにより、配信されたが確認応答されないメッセージが多すぎないようにします。これは、別のコンシューマーを同じキューに起動してキューの処理を開始できることを意味するため、すべてがメモリ内で確認応答されず、他のコンシューマーが利用できなくなるのではなく、望ましいことです。

于 2012-10-08T12:04:17.167 に答える
2

解決策は、basicQos - を設定することですchannel.basicQos(2);。私のチャネル宣言は次のようになります。

        Connection connection = RabbitConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(getQueueName(), false, false, false, null);
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(getQueueName(), true,consumer);
        channel.basicQos(2);

basicQos を 2 に設定すると、内部メモリに 2 つのメッセージのみが保持されます。CoDel アルゴリズムの使用に関する詳細と興味深い議論については、http: //www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/ を参照してください。

于 2012-10-02T10:59:34.600 に答える
1

これは、オブジェクトが消費された後に破棄されないという問題である可能性があります。デシリアライズのコードを教えてください。キューを介してオブジェクトを送信し、ある種のオブジェクト入力ストリーム/バイト配列入力ストリームを使用してそれらを逆シリアル化していると思われます。ストリームを適切に閉じていないと、メモリ リークが発生する可能性があります。

于 2012-10-02T09:53:59.830 に答える
1

問題は、コンシューマーがプロデューサーに追いつけず、キューが無制限に大きくなることです。キューのサイズを制限し、制限に達したときにプロデューサーを遅くする必要があります。また、消費者が追いつくことができるように最適化することも検討します。

于 2012-10-02T09:06:35.347 に答える