キューを宣言する次のコードがあります。
Connection connection = RabbitConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(getQueueName(), false, false, false, null);
consumer = new QueueingConsumer(channel);
channel.basicConsume(getQueueName(), true,consumer);
次の Delivery オブジェクトを取得して処理するには、次のようにします。
Delivery delivery = null;
T queue = null;
//loop over, continuously retrieving messages
while(true) {
try {
delivery = consumer.nextDelivery();
queue = deserialise(delivery.getBody());
process(queue);
} catch (ShutdownSignalException e) {
logger.warn("Shutodwon signal received.");
break;
} catch (ConsumerCancelledException e) {
logger.warn("Consumer cancelled exception: {}",e.getMessage());
break;
} catch (InterruptedException e) {
logger.warn("Interuption exception: {}", e);
break;
}
}
デシリアライズ コード。ご覧のとおり、私は Kryo を使用しています。
public T deserialise(byte[] body) {
Kryo kryo= new Kryo();
Input input = new Input(body);
T deserialised = kryo.readObject(input, getQueueClass());
input.close();
return deserialised;
}
多数のオブジェクトを含むキューでこれを実行すると、約 270 万個のオブジェクトの後、メモリ不足の例外が発生します。これはもともと、JMeter から約 90/s の速度でデータを一晩実行して見つけたもので、最初は問題なく消費されていましたが、朝、RabbitMQ に多数のデータがあり、メモリ不足の例外が発生していることに気付きました消費者。もう一度実行し、Eclipse メモリ アナライザーを使用して、このメモリがどこで使用されているかを特定しました。このことから、com.rabbitmq.client.QueueingConsumer によって参照される java.util.concurrent.LinkedBlockingQueue がメモリ不足になるまで成長し続けていることがわかります。
リソースを解放するように Rabbit に指示するために何かする必要がありますか?
ヒープ サイズを増やすことはできますが、これは短期的な修正にすぎないのではないかと心配しています。実稼働展開の数か月後にコードにメモリ リークが発生する可能性があります。