初めて MQ を使用し、RabbitMQ を使用してロギング システムを実装しようとしています。私の実装には「送信者」が含まれます
/*
* This class sends messages over MQ
*/
public class MQSender {
private final static String EXCHANGE_NAME = "mm_exchange";
private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
/*
* Boilerplate stuff
*/
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//declare the exchange that messages pass through, type=direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String[] levels = {"green", "orange", "red", "black"};
for (String log_level : levels) {
String message = "This is a " + log_level + " message";
System.out.println("Sending " + log_level + " message");
//publish the message with each of the bindings in levels
channel.basicPublish(EXCHANGE_NAME, log_level, null, message.getBytes());
}
channel.close();
connection.close();
}
}
これは、色がバインディングとして使用される取引所に、私の色ごとに 1 つのメッセージを送信します。そして、それには「レシーバー」が含まれます
public class MQReceiver {
private final static String EXCHANGE_NAME = "mm_exchange";
private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
receiveMessagesFromQueue(2);
}
public static void receiveMessagesFromQueue(int maxLevel) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
/*
* Boilerplate stuff
*/
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//declare the exchange that messages pass through, type=direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//generate random queue
String queueName = channel.queueDeclare().getQueue();
//set bindings from 0 to maxLevel for the queue
for (int level = 0; level <= maxLevel; level++) {
channel.queueBind(queueName, EXCHANGE_NAME, LOG_LEVELS[level]);
}
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while(true) {
//waits until a message is delivered then gets that message
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
}
}
}
これは、取引所から供給される色のバインディングを表す数値をパラメーターとして指定します。
私の実装では、一般的にRabbitMQでは、メッセージはConsumer
要求されるまで交換に保存され、その時点でそれぞれのキューに分散され、一度に1つずつクライアント(またはMQの消費者)に送信されるようです用語)。私の問題は、クラスを実行するMQSender
前にクラスを実行するとMQReceiver
、メッセージが配信されないことです。しかし、MQReceiver
最初にクラスを実行すると、メッセージが受信されます。私の MQ の理解から、メッセージはサーバーに保存されるべきだと思います。MQReceiver
クラスが実行されると、メッセージは消費者に配信されるはずですが、これは起こっていることではありません。私の主な質問は、これらのメッセージを交換に保存できるかどうかです。そうでない場合、消費者 (つまり、私のMQReceiver
クラス) が呼び出されたときに配信されるように、どこに保存する必要がありますか?
ご協力いただきありがとうございます!