9

RabbitMQ サーバーにメッセージを送信してから、(「返信先」キューで) 返信メッセージを待ちたいと思います。もちろん、これらのメッセージを処理するアプリケーションがダウンした場合に備えて、永遠に待ちたくはありません。タイムアウトが必要です。非常に基本的な作業のように思えますが、これを行う方法が見つかりません。py-amqplibRabbitMQ .NET clientの両方でこの問題に遭遇しました。

私がこれまでに得た最善の解決策は、中間で使用basic_getしてポーリングするsleepことですが、これはかなり醜いです:

def _wait_for_message_with_timeout(channel, queue_name, timeout):
    slept = 0
    sleep_interval = 0.1

    while slept < timeout:
        reply = channel.basic_get(queue_name)
        if reply is not None:
            return reply

        time.sleep(sleep_interval)
        slept += sleep_interval

    raise Exception('Timeout (%g seconds) expired while waiting for an MQ response.' % timeout)

確かに何か良い方法がありますか?

4

5 に答える 5

9

.NET クライアントで行ったことは次のとおりです。

protected byte[] WaitForMessageWithTimeout(string queueName, int timeoutMs)
{
    var consumer = new QueueingBasicConsumer(Channel);
    var tag = Channel.BasicConsume(queueName, true, null, consumer);
    try
    {
        object result;
        if (!consumer.Queue.Dequeue(timeoutMs, out result))
            throw new ApplicationException(string.Format("Timeout ({0} seconds) expired while waiting for an MQ response.", timeoutMs / 1000.0));

        return ((BasicDeliverEventArgs)result).Body;
    }
    finally
    {
        Channel.BasicCancel(tag);
    }
}

残念ながら、py-amqplib で同じことを行うことはできません。basic_consumeコールバックを呼び出さない限り、そのメソッドはコールバックを呼び出さず、タイムアウトをサポートしていないchannel.wait()からです! channel.wait()このばかげた制限 (私が何度も遭遇しています) は、別のメッセージを受信しない場合、スレッドが永久に凍結されることを意味します。

于 2010-05-10T01:57:48.607 に答える
8

amqplibinのタイムアウトサポートを追加しましたcarrot

これは のサブクラスですamqplib.client0_8.Connection:

http://github.com/ask/carrot/blob/master/carrot/backends/pyamqplib.py#L19-97

wait_multichannel.wait任意の数のチャネルで受信できるバージョンです。

これはある時点で上流にマージされる可能性があると思います。

于 2010-05-10T13:57:34.900 に答える
2

qpidを aと一緒に使用した例がここにあります。申し訳ありませんが、タイムアウトを実装している他の A​​MQP クライアント ライブラリはわかりません (特に、あなたが言及した 2 つの特定のものはわかりません)。msg = q.get(timeout=1)

于 2010-05-10T00:36:46.907 に答える
1

これは非同期処理の全体的な考え方を壊しているように見えますが、必要がある場合、それを行う正しい方法はRpcClientを使用することだと思います。

于 2010-05-10T00:35:01.840 に答える
1

Rabbit でタイムアウト イベントを追加できるようになりました。コードを try catch でラップし、TimeOut および Disconnect ハンドラーで例外をスローするだけです。

try{
    using (IModel channel = rabbitConnection.connection.CreateModel())
    {
        client = new SimpleRpcClient(channel, "", "", queue);
        client.TimeoutMilliseconds = 5000; // 5 sec. defaults to infinity
        client.TimedOut += RpcTimedOutHandler;
        client.Disconnected += RpcDisconnectedHandler;
        byte[] replyMessageBytes = client.Call(message);
        return replyMessageBytes;
    }
}
catch (Exception){
    //Handle timeout and disconnect here
}
private void RpcDisconnectedHandler(object sender, EventArgs e)
{
     throw new Exception("RPC disconnect exception occured.");
}

private void RpcTimedOutHandler(object sender, EventArgs e)
{
     throw new Exception("RPC timeout exception occured.");
}
于 2016-04-22T08:26:25.053 に答える