0

RabbitMQ を使用して、1 台のマシン上のプロデューサーから、複数のマシンに分散された少数のコンシューマー グループにジョブを送信します。

プロデューサはジョブを生成してキューに配置し、コンシューマは 10 ミリ秒ごとにキューをチェックして、要求されていないジョブがあるかどうかを確認し、ジョブが利用可能な場合は一度にジョブをフェッチします。1 つの特定のワーカーがジョブの処理に時間がかかりすぎる場合 (GC の一時停止またはその他の一時的な問題)、他のコンシューマーはキューからジョブを自由に削除できるため、ジョブ全体のスループットは高く保たれます。

このシステムを最初にセットアップしたとき、キュー上の複数のコンシューマーに対してサブスクライバー関係をセットアップする方法を理解できませんでした。これにより、ポーリングしてわずかな余分なレイテンシーを導入する必要がなくなります。

ドキュメントを調べても、満足のいく答えは得られませんでした。メッセージ キューを使用するのは初めてで、上記のシナリオを正確に説明する言葉を知らない可能性があります。これは黒板システムのようなものですが、この場合、「スペシャリスト」はすべて同一であり、互いの結果を消費することはありません。結果は常にジョブ プロデューサーに報告されます。

何か案は?

4

2 に答える 2

0

pub-subscribe を取得するのは簡単です。最初は同じ問題がありましたが、うまく機能します。このプロジェクトには、http://www.rabbitmq.com/getstarted.htmlにいくつかの優れたヘルプ ページがあります。

RabbitMQ にはタイムアウトと resert フラグがあり、必要に応じて使用できます。

また、ワーカーを 10 ミリ秒ごとにチェックするなどのイベント駆動型にすることもできます。これについて助けが必要な場合は、http: //rabbitears.codeplex.com/ に小さなプロジェクトがあります。

于 2011-01-16T12:18:47.957 に答える
0

ここで、rabbitMQ チャネルはスレッドセーフではないことに注意してください。これらすべてのrabbitmq操作を処理するシングルトンクラスを作成します

お気に入り

SCALAでコードサンプルを書いています

    Object QueueManager{

      val FACTORY = new ConnectionFactory
      FACTORY setUsername (RABBITMQ_USERNAME)
      FACTORY setPassword (RABBITMQ_PASSWORD)
      FACTORY setVirtualHost (RABBITMQ_VIRTUALHOST)
      FACTORY setPort (RABBITMQ_PORT)
      FACTORY setHost (RABBITMQ_HOST)

    conn = FACTORY.newConnection
      var channel: com.rabbitmq.client.Channel =  conn.createChannel

    //here to decare consumer  for queue1
    channel.exchangeDeclare(EXCHANGE_NAME, "direct", durable)
      channel.queueDeclare(QUEUE1, durable, false, false, null)
      channel queueBind (QUEUE1, EXCHANGE_NAME, QUEUE1_ROUTING_KEY)
      val queue1Consumer = new QueueingConsumer(channel)
      channel basicConsume (QUEUE1, false, queue1Consumer)

    //here to decare consumer  for queue2
    channel.exchangeDeclare(EXCHANGE_NAME, "direct", durable)
      channel.queueDeclare(QUEUE2, durable, false, false, null)
      channel queueBind (QUEUE2, EXCHANGE_NAME, QUEUE2_ROUTING_KEY)
      val queue2Consumer = new QueueingConsumer(channel)
      channel basicConsume (QUEUE2, false, queue2Consumer)





    //here u should mantion distinct ROUTING key for each queue
       def addToQueueOne{
    channel.basicPublish(EXCHANGE_NAME, QUEUE1_ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, obj.getBytes)
    }

   def addToQueueTwo{
channel.basicPublish(EXCHANGE_NAME, QUEUE2_ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, obj.getBytes)
}

def getFromQueue1:Delivery={
 queue1Consumer.nextDelivery
}

def getFromQueue2:Delivery={
  queue2Consumer.nextDelivery
}

}

2 つのキューのコード サンプルを作成しました。上記のようにさらにキューを追加できます。

于 2010-12-31T05:35:00.640 に答える