初めて MQ を使用し、RabbitMQ を使用してロギング システムを実装しようとしています。私の実装には「送信者」が含まれます
/*
 * This class sends messages over MQ
 */
public class MQSender {
    private final static String EXCHANGE_NAME = "mm_exchange";
    private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};
    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        /*
         * Boilerplate stuff
         */
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //declare the exchange that messages pass through, type=direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String[] levels = {"green", "orange", "red", "black"};
        for (String log_level : levels) {
            String message = "This is a " + log_level + " message";
            System.out.println("Sending " + log_level + " message");
            //publish the message with each of the bindings in levels
            channel.basicPublish(EXCHANGE_NAME, log_level, null, message.getBytes());
        }
        channel.close();
        connection.close();
    }
}
これは、色がバインディングとして使用される取引所に、私の色ごとに 1 つのメッセージを送信します。そして、それには「レシーバー」が含まれます
public class MQReceiver {
    private final static String EXCHANGE_NAME = "mm_exchange";
    private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};
    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        receiveMessagesFromQueue(2);
    }
    public static void receiveMessagesFromQueue(int maxLevel) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        /*
         * Boilerplate stuff
         */
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //declare the exchange that messages pass through, type=direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //generate random queue
        String queueName = channel.queueDeclare().getQueue();
        //set bindings from 0 to maxLevel for the queue
        for (int level = 0; level <= maxLevel; level++) {
            channel.queueBind(queueName, EXCHANGE_NAME, LOG_LEVELS[level]);
        }
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);
        while(true) {
            //waits until a message is delivered then gets that message
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();
            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        }
    }
}
これは、取引所から供給される色のバインディングを表す数値をパラメーターとして指定します。
私の実装では、一般的にRabbitMQでは、メッセージはConsumer要求されるまで交換に保存され、その時点でそれぞれのキューに分散され、一度に1つずつクライアント(またはMQの消費者)に送信されるようです用語)。私の問題は、クラスを実行するMQSender前にクラスを実行するとMQReceiver、メッセージが配信されないことです。しかし、MQReceiver最初にクラスを実行すると、メッセージが受信されます。私の MQ の理解から、メッセージはサーバーに保存されるべきだと思います。MQReceiverクラスが実行されると、メッセージは消費者に配信されるはずですが、これは起こっていることではありません。私の主な質問は、これらのメッセージを交換に保存できるかどうかです。そうでない場合、消費者 (つまり、私のMQReceiverクラス) が呼び出されたときに配信されるように、どこに保存する必要がありますか?
ご協力いただきありがとうございます!