1

永続的なメッセージをRabbitMQエクスチェンジに送信するプロデューサーがいます。RabbitMQのメモリまたはディスクが透かしのしきい値を超えると、RabbitMQはプロデューサーをブロックします。ドキュメントには、ソケットからの読み取りを停止し、ハートビートも一時停止すると記載されています。

私が欲しいのは、私がブロックされていることをプロデューサーコードで知る方法です。現在、ハートビートが有効になっている場合でも、すべてが永久に一時停止します。ブロックされていることを認識し、ユーザーに警告したり、その他のアクションを実行したりできるように、ある種の例外を受け取りたいのですが、これを行う方法が見つかりません。私はJavaクライアントとC#クライアントの両方を使用しており、両方でこの機能が必要になります。何かアドバイス?ありがとう。

4

2 に答える 2

0

Rabbitmq は、無期限に応答をリッスンするブロッキング rpc 呼び出しを使用します。

Java クライアント API を見ると、次のようになります。

    AMQChannel.BlockingRpcContinuation k = new AMQChannel.SimpleBlockingRpcContinuation();
    k.getReply(-1);

応答が受信されるまで -1 が引数ブロックに渡されるようになりました。

良いことは、タイムアウトを返すためにタイムアウトを渡すことができることです。悪い点は、クライアント jar を更新する必要があることです。

それを行っても問題ない場合は、上記のようなブロッキング呼び出しが行われるたびにタイムアウトを渡すことができます。コードは次のようになります。

try {
                    return k.getReply(200);
                } catch (TimeoutException e) {
                    throw new MyCustomRuntimeorTimeoutException("RabbitTimeout ex",e);
                }

コードでは、この例外を処理し、このイベントでロジックを実行できます。

この修正が必要になる可能性があるいくつかの関連クラスは次のとおりです。

com.rabbitmq.client.impl.AMQChannel
com.rabbitmq.client.impl.ChannelN
com.rabbitmq.client.impl.AMQConnection

参考:私はこれを試してみましたが、うまくいきます。

于 2013-06-14T08:16:05.380 に答える
0

申し訳ありませんが、RabbitMQ (少なくとも 2.8.6) ではこれは不可能です :-(

接続がブロックされたときにチャネルを確立しようとすることを中心に、同様の問題がありました。結果はあなたが経験しているものと同じでした。

私は、RabbitMQ C# .Net ライブラリの実際のコアを調査したところ、問題の根本的な原因は、ライブラリが無限のブロック状態になることであることがわかりました。

ここで、RabbitMQ メーリング リストの詳細を確認できます。

http://rabbitmq.1065348.n5.nabble.com/Net-Client-locks-trying-to-create-a-channel-on-a-blocked-connection-td21588.html

1 つの提案 (これは実装しませんでした) は、スレッド内で作業を行い、他のコンポーネントでタイムアウトを管理し、タイムアウトを超えた場合はスレッドを強制終了することでした。私たちはリスクを受け入れました:-(

于 2013-01-18T13:15:29.060 に答える