0

Rabbitでワークフロー環境をセットアップしました。

それは私たちのニーズを解決しますが、スケジュールされたタスクのようにそれを行うことも良い習慣であるかどうかを知りたい.

スケジューリングとは、ミッション クリティカルな 100% 調整された時間がないことを意味します。したがって、ジョブを 60 秒後に再試行する必要がある場合は、60 秒以上かかることを意味し、キューがいつ処理されるかによって異なります。

Q_WAIT を 1 つ作成し、設定を転送するためのヘッダーをいくつか作成しました。

次のようにしましょう:

ワーカーは Q_ACTION でサブスクライブされて実行されています

アクションが失敗した場合 (smtp サーバーに到達できないなど)

-> (再) メッセージを Q_WAIT にパブリッシュし、properties.headers["scheduled"] = time + 60seconds を設定します。


別のプロセスは、メソッド pop() によって Q_WAIT 内のすべてのメッセージを 15 秒ごとにループし、subsubscribed によってループしません。

q_WAIT.pop(:ack => true) do |delivery_info,properties,body|...

  if (properties.headers["scheduled"] has reached its time)

     -> (Re-)Publish the message back to Q_ACTION
        ack(message)

各ループの後、接続が閉じられ、NOT (Re-)Published が確認されなかったために Q_WAIT に残されます。


誰かがこれを実用的な(良い)慣行として確認できますか。

4

2 に答える 2

3

元の質問で説明されているように、ループプロセスを使用できます。

また、 Dead Letter Exchanges extensionでTime-To-Live Extension を利用することもできます。

最初に、x-dead-letter-exchange Q_WAIT現在の交換に等しく、バインドさx-dead-letter-routing-keyれたルーティング キーに等しいキュー引数を指定します。Q_ACTION

次にx-message-ttl、メッセージごとのカスタム ttl が必要な場合は、パブリッシュ中にキュー引数セットまたはメッセージ有効期限プロパティを設定します (これはベスト プラクティスではありませんが、いくつかのよく知られた警告がありますが、機能します)。

この場合、メッセージは、追加のコンシューマなしで ttl が期限切れQ_WAITになったQ_ACTION直後に配信不能になります。これは、より信頼性が高く安定しています。

高度な再パブリッシュ ロジック (メッセージ本文、プロパティの変更) が必要な場合はQ_PRE_ACTION、メッセージを消費し、それらを変更してからターゲット キュー (たとえばQ_ACTION) にパブリッシュするための追加のキュー (たとえば ) が必要であることに注意してください。

于 2014-06-15T09:25:28.913 に答える
0

ここのコメントで述べたように、私はその機能を試してみx-dead-letter-exchangeましたが、ほとんどの要件で機能しました。1 つの質問/誤解は、TTL-PER-MESSAGE オプションです。

こちらの例をご覧ください。私の理解から:

  1. DLQ のタイムアウトは 10 秒です
  2. 最初のメッセージは、公開後 10 秒でサブスクライバーに表示されます。
  3. 2 番目のメッセージは、最初のメッセージの 1 秒後に投稿され、message-ttl (有効期限) は 3 秒です。

2 番目のメッセージは、公開から 3 秒後、最初のメッセージの前に発音されるはずです。

しかし、そのようには機能しませんでした。両方とも 10 秒後に利用可能になります。

Q: メッセージの有効期限が DLQ ttl を覆すべきではありませんか?

#!/usr/bin/env ruby
# encoding: utf-8

require 'bunny'

B = Bunny.new ENV['CLOUDAMQP_URL']
B.start

DELAYED_QUEUE='work.later'
DESTINATION_QUEUE='work.now'

def publish
  ch = B.create_channel
  # declare a queue with the DELAYED_QUEUE name
  q = ch.queue(DELAYED_QUEUE, :durable => true, arguments: {
    # set the dead-letter exchange to the default queue
    'x-dead-letter-exchange' => '',
    # when the message expires, set change the routing key into the destination queue name
    'x-dead-letter-routing-key' => DESTINATION_QUEUE,
    # the time in milliseconds to keep the message in the queue
    'x-message-ttl' => 10000,
  })
  # publish to the default exchange with the the delayed queue name as routing key,
  # so that the message ends up in the newly declared delayed queue
  ch.basic_publish('message content 1 ' + Time.now.strftime("%H-%M-%S"), "", DELAYED_QUEUE, :persistent => true)
  puts "#{Time.now}: Published the message 1"

  # wait moment before next publish
  sleep 1.0

  # puts this with a shorter ttl
  ch.basic_publish('message content 2 ' + Time.now.strftime("%H-%M-%S"), "", DELAYED_QUEUE, :persistent => true, :expiration => "3000")
  puts "#{Time.now}: Published the message 2"

  ch.close
end

def subscribe
  ch = B.create_channel
  # declare the destination queue
  q = ch.queue DESTINATION_QUEUE, durable: true 
  q.subscribe do |delivery, headers, body|
    puts "#{Time.now}: Got the message: #{body}"
  end
end

subscribe()
publish()

sleep
于 2014-06-15T12:48:55.343 に答える