1

メッセージを RabbitMQ サーバーに発行する Java アプリケーションがあります。
利用可能なディスク容量がうさぎのロー ウォーターマークを下回ると、予期しない動作が発生します。

予期される動作は、接続がブロッキングになり、アプリケーションが への呼び出しでハングすることChannel.basicPublishです。

実際の動作では、接続が管理コンソールでブロックされているように見えますが、呼び出しがChannel.basicPublishエラーなしで返され、発行されるはずだったメッセージが失われます。
この動作は、RabbitMQ の最も重要な機能である堅牢性を損ないます。

以下は、テスト用の私のアプリケーションの最小バージョンです。増分インデックス (1、2、3、...) を使用して毎秒メッセージを発行するだけです。rabbitmq.configメッセージは、ファイルに次の行を追加して、低ウォーターマークを非常に高い値に設定するまで、RabbitMQ サーバーによって問題なく受信されます。

[    
  {rabbit, [{disk_free_limit, 60000000000}]}
].

サーバーを再起動した後、管理コンソールにディスク容量不足の通知が表示され、接続が「ブロック中」とマークされ、サーバーがメッセージを受信しなくなりました。ただし、アプリケーションは何も問題がなかったかのように実行され、メッセージを送信し続けます。透かしを通常の値に戻すと、メッセージはサーバーによって再び受信されますが、接続がブロックされている間に送信されたすべてのメッセージが失われます。

  • 私は何か間違ったことをしていますか?
  • これは RabbitMQ のバグですか?
  • もしそうなら、回避策はありますか?

OS: Windows 8 64 ビット
RabbitMQ サーバー バージョン: 3.1.1
RabbitMQ Java クライアント バージョン: 3.1.0

テスト アプリケーション コード:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;

public class Main {

    private final static Logger logger = LoggerFactory.getLogger(Main.class);
    private final static String QUEUE_NAME = "testQueue";

    private static Channel channel = null;

    private static void connectToRabbitMQ() throws IOException {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        channel = connection.createChannel();
        channel.queueDeclare(
                QUEUE_NAME,
                true,       // Durable - survive a server restart
                false,      // Not exclusive to this connection
                false,      // Do not autodelete when no longer in use
                null        // Arguments
        );
    }

    private static void disposeChannel()
    {
        if (channel == null) {
            return;
        }
        try {
            channel.close();
        } catch (Exception e) {
        } finally {
            channel = null;
        }
    }

    public static void main(String[] args) throws Exception {

        boolean interrupted = false;
        int messageNumber = 1;

        while (!interrupted) {
            byte[] message = Integer.toString(messageNumber).getBytes();
            try {
                if (channel == null) {
                    connectToRabbitMQ();
                }
                channel.basicPublish(
                        "",
                        QUEUE_NAME,
                        MessageProperties.MINIMAL_PERSISTENT_BASIC,
                        message
                );
                logger.info("Published message number {}", messageNumber);
                messageNumber++;
            } catch (Exception e) {
                logger.info("Unable to connect to RabbitMQ...");
                disposeChannel();
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                logger.info("Interrupted");
                interrupted = true;
            }
        }
    }
}
4

0 に答える 0