1

初めて MQ を使用し、RabbitMQ を使用してロギング システムを実装しようとしています。私の実装には「送信者」が含まれます

/*
 * This class sends messages over MQ
 */
public class MQSender {
    private final static String EXCHANGE_NAME = "mm_exchange";
    private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};

    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        /*
         * Boilerplate stuff
         */
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //declare the exchange that messages pass through, type=direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String[] levels = {"green", "orange", "red", "black"};
        for (String log_level : levels) {
            String message = "This is a " + log_level + " message";
            System.out.println("Sending " + log_level + " message");
            //publish the message with each of the bindings in levels
            channel.basicPublish(EXCHANGE_NAME, log_level, null, message.getBytes());
        }

        channel.close();
        connection.close();
    }
}

これは、色がバインディングとして使用される取引所に、私の色ごとに 1 つのメッセージを送信します。そして、それには「レシーバー」が含まれます

public class MQReceiver {
    private final static String EXCHANGE_NAME = "mm_exchange";
    private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};

    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        receiveMessagesFromQueue(2);
    }

    public static void receiveMessagesFromQueue(int maxLevel) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        /*
         * Boilerplate stuff
         */
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //declare the exchange that messages pass through, type=direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        //generate random queue
        String queueName = channel.queueDeclare().getQueue();

        //set bindings from 0 to maxLevel for the queue
        for (int level = 0; level <= maxLevel; level++) {
            channel.queueBind(queueName, EXCHANGE_NAME, LOG_LEVELS[level]);
        }

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while(true) {
            //waits until a message is delivered then gets that message
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        }
    }
}

これは、取引所から供給される色のバインディングを表す数値をパラメーターとして指定します。

私の実装では、一般的にRabbitMQでは、メッセージはConsumer要求されるまで交換に保存され、その時点でそれぞれのキューに分散され、一度に1つずつクライアント(またはMQの消費者)に送信されるようです用語)。私の問題は、クラスを実行するMQSender前にクラスを実行するとMQReceiver、メッセージが配信されないことです。しかし、MQReceiver最初にクラスを実行すると、メッセージが受信されます。私の MQ の理解から、メッセージはサーバーに保存されるべきだと思います。MQReceiverクラスが実行されると、メッセージは消費者に配信されるはずですが、これは起こっていることではありません。私の主な質問は、これらのメッセージを交換に保存できるかどうかです。そうでない場合、消費者 (つまり、私のMQReceiverクラス) が呼び出されたときに配信されるように、どこに保存する必要がありますか?

ご協力いただきありがとうございます!

4

1 に答える 1

2

ルーティング キーが交換にバインドされたキューと一致しない場合、RabbitMQ はメッセージを破棄します。最初に開始するMQSenderと、キューはバインドされないため、送信するメッセージは失われます。を開始するMQReceiverと、キューが交換にバインドされるため、RabbitMQ は からメッセージを送信する場所を確保しますMQSender。匿名キューを作成したため、MQReceiver を停止すると、キューとすべてのバインディングが交換から削除されます。

が実行されていないときにメッセージをサーバーに保存するMQReceiver場合は、受信側で名前付きキューを作成し、ルーティング キーをそのキューにバインドする必要があります。名前付きキューの作成はべき等であり、キューが既に存在する場合は作成されないことに注意してください。次に、レシーバーが名前付きキューからメッセージをプルする必要があります。

コードを次のように変更します。

MQSender

....
String namedQueue = "logqueue";
//declare named queue and bind log level routing keys to it.
//RabbitMQ will put messages with matching routing keys in this queue
channel.queueDeclare(namedQueue, false, false, false, null);
for (int level = 0; level < LOG_LEVELS.length; level++) {
   channel.queueBind(namedQueue, EXCHANGE_NAME, LOG_LEVELS[level]);
}
...

MQレシーバー

...
channel.exchangeDeclare(EXCHANGE_NAME, "direct");

QueueingConsumer consumer = new QueueingConsumer(channel);

//Consume messages off named queue instead of anonymous queue
String namedQueue = "logqueue";
channel.basicConsume(namedQueue, true, consumer);

while(true) {
...
于 2013-08-23T05:23:40.517 に答える