M2MソリューションのRabbitMQのPOCに取り組んでいます。データを公開する物理デバイスが多数あります(現時点ではJavaクライアントを使用してクライアントをシミュレートします-最終的にはMQTTを介して)。したい:
- すべての生データをサブスクライブしてデータベースにジャーナルします
- データタイプごとにデータのサブセットをサブスクライブして、これらのタイプのデータのソリューションを個別にスケーリングできるようにします
- 交換を通じて新しいイベントを公開します(たとえば、生のイベントを取得し、それをより有用にし、システムを介して再送信します)
各メッセージにはkey:value.key:value.key:value.messageType:1のようなルーティングキーがあり、デバイスからのデータにはFROMDEVICE.MESSAGETYPE:1.key:value...などの追加のキーがあります。デバイスからの生データは、ルーティングキー#.FROMDEVICE。#(上記のケース#1)を使用して交換からキューを構築します。特定のメッセージタイプと付加価値を取得するサブスクライバーは、ルーティングキー#.MESSAGETYPE:1。#(上記のケース#2)を使用してキューを構築し、同じエクスチェンジに新しいメッセージを送信して、ルーティングキーからFROMDEVICEを削除し、 .MESSAGETYPE:1と.MESSAGETYPE:101(上記のケース#3)。次に、新しいメッセージタイプ用の独立したサブスクライバ/キューがあります。
再公開/付加価値メッセージに検索対象のroutingKeyが存在しない場合でも、デバイスからのデータのみを受信する必要があるサブスクライバーが付加価値データ(MESSAGETYPE:101)を取得していることを除いて、すべて問題ありません。
- FROMDEVICE.MESSAGETYPE:1->
- ルーティングキー#.FROMDEVICE。#と一致する必要があります
- #.MESSAGETYPE:1。#と一致する必要があります
- メッセージタイプ:101
- ルーティングキー#.MESSAGETYPE:101。#と一致する必要があります
- #.FROMDEVICE。#と一致してはなりません(ただし一致します)
デバイスからのデータのみをサブスクライブするコード:
public class HandlerWriteEverythingFromDevice {
private final static String EXCHANGE_NAME = "logsTopicDurable";
private final static String QUEUE_NAME = "fromDevice";
/**
* Writes all data from device to a data store.
*/
public static void main(String[] args) throws java.io.IOException, java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.101");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] listens for messages from devices - durable!");
channel.basicQos(1);
String routingKey = "#.fromDevice.#".toUpperCase();
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); //bind to all selected messages
System.out.println(" [*] subscribing to: " + routingKey);
System.out.println(" [*] Waiting for messages. To exit press CTRL_C");
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false; //ack back when done
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
int msgCount = 0;
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Message Count: " + ++msgCount + " ROUTINGKEY: '" + delivery.getEnvelope().getRoutingKey() + "\n MESSAGE: '" + message + "'");
Thread.sleep(250); //simulate some time to insert into the db.
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
messageType:1のみをサブスクライブし、messageType:101を再公開するコード
private final static String EXCHANGE_NAME = "logsTopicDurable";
private final static String QUEUE_NAME = "messageType1";
/**
* Handler for messageType:1
*/
public static void main(String[] args) throws java.io.IOException, java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.101");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] listens for messageType:1 and submits messageType:101");
channel.basicQos(1);
String routingKey = "#.messageType:1.#".toUpperCase();
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); //bind to all selected messages
System.out.println(" [*] subscribing to: " + routingKey);
System.out.println(" [*] Waiting for messages. To exit press CTRL_C");
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false; //ack back when done
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
int msgCount = 0;
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Message Count: " + ++msgCount + " ROUTINGKEY: '" + delivery.getEnvelope().getRoutingKey() + "\n MESSAGE: '" + message + "'");
channel.basicPublish(EXCHANGE_NAME,
delivery.getEnvelope().
getRoutingKey().
replaceAll("messageType:1", "messageType:101").
replaceAll(".FROMDEVICE", "").
replaceAll("FROMDEVICE.", "").trim(),
true,
MessageProperties.PERSISTENT_BASIC,
message.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
messageType:101にはパブリッシャーコードとサブスクライバーコードがありますが、この議論には必要ないと思います。キューがバインドされているチャネルに公開することが原因かどうか疑問に思いましたが、2つのチャネル(同じ接続オブジェクト)を作成しようとすると、同じ結果と非常に醜いコードが得られました。