0

WSO2 MB 3.2.1 と共に RabbitMQ 2.8.7 DotNet クライアントを使用しています。私の展開環境は、Win7 R1 64bit または Win Server 08 R2 64bit です。Exchange への複数の接続で、複数のコンシューマーにストリーミングされるデータが断続的に失われるという問題が発生しました。WSO2 MB を 3.2.3 に更新し、交換をファンアウトに変更してこの接続の損失を解決しようとしましたが、問題が発生しました。

このリンクが多少関連しているように「見える」でしょう:

http://wso2-oxygen-tank.10903.n7.nabble.com/Qpid-Authorization-handler-does-not-allow-to-subscribe-td2080.html

これが正しいアプローチであるかどうかを最初に知りたかったのです。

次に、データにアクセスできないため、ファンアウトを使用して実行時に受け取る現在のエラーを理解したいと思いました。以下は、受信したエラー、WSO2 MB ログ出力、およびプロデューサーからのコード スニペットです。

特定するためにさらに情報が必要な場合はお知らせください。

エラーが発生しました...

 "Unhandled Exception:
RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation
was interrupted: AMQP close-reason, initiated by Peer, code=504, text="org.apache.qpid.AMQSecurityException: Permission denied: binding
[error code 403: access refused]", classId=50, methodId=20, cause=
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply()
   at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body)
   at RabbitMQ.Client.Framing.Impl.v0_9_1.Model._Private_QueueBind(String queue,
 String exchange, String routingKey, Boolean nowait, IDictionary arguments)
   at RabbitMQ.Client.Impl.ModelBase.QueueBind(String queue, String exchange, String routingKey, IDictionary arguments)
   at RabbitMQ.Client.Impl.ModelBase.QueueBind(String queue, String exchange, String routingKey)
   at Send.Main()

ログ@エラーの時間...

INFO   | jvm 1    | 2013/03/01 11:29:27 | [2013-03-01 11:29:27,042] DEBUG {org.apache.qpid.server.protocol.AMQProtocolEngine} -  Frame Received: Frame channelId: 0, bodyFrame: [ConnectionStartOkBodyImpl: clientProperties={platform=[LONG_STRING: .NET], product=[LONG_STRING: RabbitMQ], capabilities=[FIELD_TABLE: {publisher_confirms=[BOOLEAN: true], exchange_exchange_bindings=[BOOLEAN: true], consumer_cancel_notify=[BOOLEAN: true], basic.nack=[BOOLEAN: true]}], copyright=[LONG_STRING: Copyright (C) 2007-2012 VMware, Inc.], information=[LONG_STRING: Licensed under the MPL.  See http://www.rabbitmq.com/], (http://www.rabbitmq.com/%5D,) version=[LONG_STRING: 0.0.0.0]}, mechanism=PLAIN, response=[0, 115, 118, 99, 46, 114, 105, 46, 97, 116, 99, 0, 112, 97, 115, 115, 119, 111, 114, 100], locale=en_US]
INFO   | jvm 1    | 2013/03/01 11:29:27 | [2013-03-01 11:29:27,042]  INFO {org.apache.qpid.server.handler.ConnectionStartOkMethodHandler} -  SASL Mechanism selected: PLAIN
INFO   | jvm 1    | 2013/03/01 11:29:27 | [2013-03-01 11:29:27,042]  INFO {org.apache.qpid.server.handler.ConnectionStartOkMethodHandler} -  Locale selected: en_US
INFO   | jvm 1    | 2013/03/01 11:29:27 | [2013-03-01 11:29:27,681]  INFO {org.apache.qpid.server.handler.ConnectionStartOkMethodHandler} -  Connected as: svc.abc.def
INFO   | jvm 1    | 2013/03/01 11:29:27 | [2013-03-01 11:29:27,681]  INFO {org.apache.qpid.server.handler.ConnectionStartOkMethodHandler} -  Framesize set to 65535
INFO   | jvm 1    | 2013/03/01 11:29:27 | [2013-03-01 11:29:27,681] DEBUG {org.apache.qpid.server.protocol.AMQProtocolEngine} -  Frame Received: Frame channelId: 0, bodyFrame: [ConnectionTuneOkBodyImpl: channelMax=256, frameMax=65535, heartbeat=0]
INFO   | jvm 1    | 2013/03/01 11:29:27 | [2013-03-01 11:29:27,681] DEBUG {org.apache.qpid.server.protocol.AMQProtocolEngine} -  Frame Received: Frame channelId: 0, bodyFrame: [ConnectionOpenBodyImpl: virtualHost=carbon, capabilities=null, insist=false]
INFO   | jvm 1    | 2013/03/01 11:29:27 | [2013-03-01 11:29:27,697] DEBUG {org.apache.qpid.server.protocol.AMQProtocolEngine} -  Frame Received: Frame channelId: 1, bodyFrame: [ChannelOpenBodyImpl: outOfBand=null]
INFO   | jvm 1    | 2013/03/01 11:29:27 | [2013-03-01 11:29:27,697]  INFO {org.apache.qpid.server.handler.ChannelOpenHandler} -  Connecting to: carbon
INFO   | jvm 1    | 2013/03/01 11:29:27 | [2013-03-01 11:29:27,697] DEBUG {org.apache.qpid.server.protocol.AMQProtocolEngine} -  Frame Received: Frame channelId: 1, bodyFrame: [QueueDeclareBodyImpl: ticket=0, queue=fanout, passive=false, durable=false, exclusive=false, autoDelete=false, nowait=false, arguments=null]
INFO   | jvm 1    | 2013/03/01 11:29:27 | [2013-03-01 11:29:27,759]  INFO {org.apache.qpid.server.handler.QueueDeclareHandler} -  Queue fanout bound to default exchange(<<default>>)
INFO   | jvm 1    | 2013/03/01 11:29:27 | [2013-03-01 11:29:27,759]  INFO {org.apache.qpid.server.handler.QueueDeclareHandler} -  Queue fanout declared successfully
INFO   | jvm 1    | 2013/03/01 11:29:27 | [2013-03-01 11:29:27,759] DEBUG {org.apache.qpid.server.protocol.AMQProtocolEngine} -  Frame Received: Frame channelId: 1, bodyFrame: [QueueBindBodyImpl: ticket=0, queue=fanout, exchange=fanout, routingKey=null, nowait=false, arguments=null]
INFO   | jvm 1    | 2013/03/01 11:29:27 | [2013-03-01 11:29:27,759]  INFO {org.apache.qpid.server.protocol.AMQProtocolEngine} -  Closing channel due to: org.apache.qpid.AMQSecurityException: Permission denied: binding  [error code 403: access refused]
INFO   | jvm 1    | 2013/03/01 11:29:27 | [2013-03-01 11:29:27,759]  INFO {org.apache.qpid.server.AMQChannel} -  No consumers to unsubscribe on channel [/192.168.1.14:56087(svc.abc.def):1]
INFO   | jvm 1    | 2013/03/01 11:29:27 | [2013-03-01 11:29:27,775] DEBUG {org.apache.qpid.server.protocol.AMQProtocolEngine} -  Frame Received: Frame channelId: 1, bodyFrame: [ChannelCloseOkBodyImpl: ]
INFO   | jvm 1    | 2013/03/01 11:29:27 | [2013-03-01 11:29:27,775]  INFO {org.apache.qpid.server.protocol.AMQProtocolEngine} -  Channel[1] awaiting closure - processing close-ok
INFO   | jvm 1    | 2013/03/01 11:29:27 | [2013-03-01 11:29:27,775]  INFO {org.apache.qpid.server.handler.ChannelCloseOkHandler} -  Received channel-close-ok for channel-id 1
INFO   | jvm 1    | 2013/03/01 11:29:29 | [2013-03-01 11:29:29,070] DEBUG {org.apache.qpid.server.protocol.AMQProtocolEngine} -  Frame Received: Frame channelId: 0, bodyFrame: [ConnectionCloseBodyImpl: replyCode=200, replyText=Connection close forced, classId=0, methodId=0]
INFO   | jvm 1    | 2013/03/01 11:29:29 | [2013-03-01 11:29:29,070]  INFO {org.apache.qpid.server.handler.ConnectionCloseMethodHandler} -  ConnectionClose received with reply code/reply text 200/Connection close forced for /192.168.1.14:56087(svc.abc.def)

この問題は、Java に接続しようとしたときにも発生することがわかりました。プロデューサーの C# コードの簡単な例を以下に示します。

class Send {

    public static void Main() {

        string serverAddress = "amqp://" + "192.168.1.12" + ":5672/carbon"; 

            ConnectionFactory factory = new ConnectionFactory();

            factory.Uri = serverAddress;

            using (IConnection connection = factory.CreateConnection())

            using (IModel channel = connection.CreateModel()) {

            channel.QueueDeclare("fanout", false, false, false, null);

            channel.QueueBind("fanout", "fanout", "");

            string message = "Hello World!";

            byte[] body = System.Text.Encoding.UTF8.GetBytes(message);

            channel.BasicPublish("", "hello", null, body);
            Console.WriteLine(" [x] Sent {0}", message);
        }
    }
}
4

1 に答える 1

1

WSO2 MB では、指定された名前に対して交換が動的に作成されることはありません。したがって、「ファンアウト」交換でこの作業を行うには、qpid-virtualhosts.xml ファイルで事前に宣言する必要があります。それ以外の場合は、WSO2 MB のデフォルトの交換を使用します。

キューをバインドする必要がある交換名を正しく宣言していないようです。キューの場合、デフォルトの交換名の宣言は次のようになります。

 ch.ExchangeDeclare("amq.direct", "direct");

トピックの場合は、`

 ch.ExchangeDeclare("amq.topic", "topic");

サンプルの .Net/C# Consumer/Publisher クライアントを WSO2 Message Broker で使用する方法については、このブログ投稿を参照してください。

于 2013-08-18T12:37:31.357 に答える