2

bunny ruby​​ gemを使用して、rabbitmq サーバーとの間でメッセージを送受信しています。待機時間にタイムアウトを設定しながら、キューからメッセージを同期的にポップするにはどうすればよいですか (つまり、3 秒後にメッセージが到着しない場合はブロックを停止します)。

明らかな解決策の 1 つは、タイムアウトになるかメッセージが受信されるまで pop 呼び出しをループすることですが、これは非常に非効率的です。よりエレガントなソリューションはありますか?bunny のドキュメントと rabbitmq サイトのチュートリアルを調べましたが、この特定のシナリオの解決策が見つかりません。

4

3 に答える 3

2

このような機能を作成するには、基本的なメソッド subscribe を書き直す必要がありました。チャネルのタイムアウト時間を設定できることがわかりましたが、関数にそのような入力パラメーターがありませんでした。

response = nil

subscribe(block: true, timeout: 10) do |delivery_info, properties, payload|
  Rails.logger.info "got message #{payload}"
  response = payload
  @channel.consumers[delivery_info.consumer_tag].cancel
end



def subscribe(opts = {block: false}, &block)
    ctag = opts.fetch(:consumer_tag, @channel.generate_consumer_tag)
    consumer = Bunny::Consumer.new(@channel,@response_queue,ctag)

    consumer.on_delivery(&block)
    @channel.basic_consume_with(consumer)

    if opts[:block]
      @channel.work_pool.join(opts[:timeout])
    end
end
于 2016-02-01T09:28:16.803 に答える
1

Bunny を使用して簡単に行う方法が見つかりませんでした。ここで提案する方法は、タイムアウトなしでブロックします。ただし、呼び出しセマンティクスごとに 1 つのメッセージを取得することはサポートしています。Bunny が内部的にスレッド プールを使用してメッセージを受信することを考えると、Ruby のQueueクラスなどのブロッキング キューを使用して、Bunny のスレッド プールから呼び出しスレッドにメッセージを転送するのがより簡単な方法であると考えました。次のようなもの:

# Set up your internal queue somewhere (in your class's initialize maybe?)
@internal_queue = Queue.new

# In the main thread that needs to block
...
# the call to subscribe is non-blocking
queue.subscribe do |delivery_info, properties, payload|
  @internal_queue.enq(payload)  # this runs inside Bunny's pool
end

# the call to deq is blocking
response = @internal_queue.deq  # this blocks the main thread till a
                                # message is pushed to the internal_q

リッスンする必要がある AMQP チャネルごとに 1 つの @internal_queue を維持できます。これらの部分を個別のメソッドに分割して、一度に 1 つのメッセージを返す適切なブロッキング API を作成できます。

後で、モニター MonitorMixin で拡張された単純な配列をラップし、ミューテックス + 条件変数セマンティクスを使用する TimedWaitableQueue クラスを作成しました。これにより、タイムアウトのあるデキュー呼び出しでのブロックが可能になりました。

于 2016-08-04T16:39:58.603 に答える