0

M2MソリューションのRabbitMQのPOCに取り組んでいます。データを公開する物理デバイスが多数あります(現時点ではJavaクライアントを使用してクライアントをシミュレートします-最終的にはMQTTを介して)。したい:

  1. すべての生データをサブスクライブしてデータベースにジャーナルします
  2. データタイプごとにデータのサブセットをサブスクライブして、これらのタイプのデータのソリューションを個別にスケーリングできるようにします
  3. 交換を通じて新しいイベントを公開します(たとえば、生のイベントを取得し、それをより有用にし、システムを介して再送信します)

各メッセージにはkey:value.key:value.key:value.messageType:1のようなルーティングキーがあり、デバイスからのデータにはFROMDEVICE.MESSAGETYPE:1.key:value...などの追加のキーがあります。デバイスからの生データは、ルーティングキー#.FROMDEVICE。#(上記のケース#1)を使用して交換からキューを構築します。特定のメッセージタイプと付加価値を取得するサブスクライバーは、ルーティングキー#.MESSAGETYPE:1。#(上記のケース#2)を使用してキューを構築し、同じエクスチェンジに新しいメッセージを送信して、ルーティングキーからFROMDEVICEを削除し、 .MESSAGETYPE:1と.MESSAGETYPE:101(上記のケース#3)。次に、新しいメッセージタイプ用の独立したサブスクライバ/キューがあります。

再公開/付加価値メッセージに検索対象のroutingKeyが存在しない場合でも、デバイスからのデータのみを受信する必要があるサブスクライバーが付加価値データ(MESSAGETYPE:101)を取得していることを除いて、すべて問題ありません。

  • FROMDEVICE.MESSAGETYPE:1->
    • ルーティングキー#.FROMDEVICE。#と一致する必要があります
    • #.MESSAGETYPE:1。#と一致する必要があります
  • メッセージタイプ:101
    • ルーティングキー#.MESSAGETYPE:101。#と一致する必要があります
    • #.FROMDEVICE。#と一致してはなりません(ただし一致します)

デバイスからのデータのみをサブスクライブするコード:

public class HandlerWriteEverythingFromDevice {

private final static String EXCHANGE_NAME = "logsTopicDurable";
private final static String QUEUE_NAME = "fromDevice";
/**
 * Writes all data from device to a data store.
 */
public static void main(String[] args) throws java.io.IOException, java.lang.InterruptedException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.56.101");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);

    System.out.println(" [*] listens for messages from devices - durable!");

    channel.basicQos(1);

    String routingKey = "#.fromDevice.#".toUpperCase();

    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); //bind to all selected messages
    System.out.println(" [*] subscribing to: " + routingKey);

    System.out.println(" [*] Waiting for messages. To exit press CTRL_C");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    boolean autoAck = false; //ack back when done
    channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    int msgCount = 0;
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());

        System.out.println(" [x] Message Count: " + ++msgCount + " ROUTINGKEY: '" + delivery.getEnvelope().getRoutingKey() + "\n     MESSAGE: '" + message + "'");
        Thread.sleep(250); //simulate some time to insert into the db.
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}
}

messageType:1のみをサブスクライブし、messageType:101を再公開するコード

    private final static String EXCHANGE_NAME = "logsTopicDurable";
private final static String QUEUE_NAME = "messageType1";
/**
 * Handler for messageType:1
 */
public static void main(String[] args) throws java.io.IOException, java.lang.InterruptedException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.56.101");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);

    System.out.println(" [*] listens for messageType:1 and submits messageType:101");

    channel.basicQos(1);

    String routingKey = "#.messageType:1.#".toUpperCase();

    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); //bind to all selected messages
    System.out.println(" [*] subscribing to: " + routingKey);

    System.out.println(" [*] Waiting for messages. To exit press CTRL_C");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    boolean autoAck = false; //ack back when done
    channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    int msgCount = 0;
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());

        System.out.println(" [x] Message Count: " + ++msgCount + " ROUTINGKEY: '" + delivery.getEnvelope().getRoutingKey() + "\n     MESSAGE: '" + message + "'");

        channel.basicPublish(EXCHANGE_NAME, 
                delivery.getEnvelope().
                        getRoutingKey().
                        replaceAll("messageType:1", "messageType:101").
                        replaceAll(".FROMDEVICE", ""). 
                        replaceAll("FROMDEVICE.", "").trim(), 
                true, 
                MessageProperties.PERSISTENT_BASIC, 
                message.getBytes());

        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}

messageType:101にはパブリッシャーコードとサブスクライバーコードがありますが、この議論には必要ないと思います。キューがバインドされているチャネルに公開することが原因かどうか疑問に思いましたが、2つのチャネル(同じ接続オブジェクト)を作成しようとすると、同じ結果と非常に醜いコードが得られました。

4

1 に答える 1

1

バインディングキーに関しては、少しリベラルであることをお勧めします。もう少し明確にするために、バインディング キーとルーティング キーという用語を別の方法で使用する必要があります。ルーティング キーは、プロデューサーによって送信されるものです。バインディング キーは、キューをトピック交換にバインドするために使用するものです。

あなたが言うとき、あなたがどちらを指しているのか確信が持てないので

「ルーティングキー #.MESSAGETYPE:101.# と一致する必要があります」

ルーティングキーを使用してメッセージを送信するの#.MESSAGETYPE:101.#は悪い考えだからです。そうではないと思いますが、そうでない場合は!

これがバインディングキーであると仮定しましょう。これについては特にテストを行っていないのでわかりませんが、#前後に問題が発生している可能性があります。ルーティング キーの仕様について考える必要があります。それらが準拠しなければならない何らかの形式。拡張可能かもしれませんが、完全に無料というわけではありません。そうすれば、より詳細な制御を与える*代わりに、より具体的なバインディング キーを使用できます。#

于 2013-03-19T08:51:56.217 に答える