19

Redis 駆動のスケーラブルなタスク スケジューリング システムを設計する必要があります。

要件:

  • 複数のワーカー プロセス。
  • 多くのタスクがありますが、長期間アイドル状態になる可能性があります。
  • 妥当なタイミング精度。
  • アイドル時のリソースの浪費を最小限に抑えます。
  • 同期 Redis API を使用する必要があります。
  • Redis 2.4 で動作するはずです (つまり、今後の 2.6 からの機能はありません)。
  • Redis 以外の RPC の手段を使用しないでください。

疑似 API: schedule_task(timestamp, task_data). タイムスタンプは整数秒です。

基本的な考え方:

  • リストの今後のタスクをリッスンします。
  • タイムスタンプごとにタスクをバケットに入れます。
  • 最も近いタイムスタンプまでスリープします。
  • タイムスタンプが最も近いものよりも小さい新しいタスクが表示された場合は、ウェイクアップします。
  • タイムスタンプ ≤ now のすべての今後のタスクをバッチで処理します (タスクの実行が高速であると仮定します)。
  • 並行ワーカーが同じタスクを処理しないことを確認してください。同時に、タスクの処理中にクラッシュしてもタスクが失われないようにします。

これまでのところ、これを Redis プリミティブに適合させる方法がわかりません...

手がかりはありますか?

同様の古い質問があることに注意してください: Redis での実行の遅延 / スケジューリング? この新しい質問では、より詳細を紹介します (最も重要なのは、多くのワーカー)。これまでのところ、ここで古い回答を適用する方法を理解できませんでした。つまり、新しい質問です。

4

5 に答える 5

12

これは、他のいくつかのソリューションに基づいて構築された別のソリューションです [1]。redis WATCH コマンドを使用して、redis 2.6 で lua を使用せずに競合状態を削除します。

基本的なスキームは次のとおりです。

  • スケジュールされたタスクには redis zset を使用し、すぐに実行できるタスクには redis キューを使用します。
  • ディスパッチャーに zset をポーリングさせ、実行する準備ができているタスクを redis キューに移動させます。冗長性のために複数のディスパッチャが必要になる場合がありますが、おそらく多くは必要ないか、必要ありません。
  • redis キューでポップをブロックするワーカーを必要な数だけ用意します。

私はそれをテストしていません:-)

foo ジョブの作成者は次のようにします。

def schedule_task(queue, data, delay_secs):
    # This calculation for run_at isn't great- it won't deal well with daylight
    # savings changes, leap seconds, and other time anomalies. Improvements
    # welcome :-)
    run_at = time.time() + delay_secs

    # If you're using redis-py's Redis class and not StrictRedis, swap run_at &
    # the dict.
    redis.zadd(SCHEDULED_ZSET_KEY, run_at, {'queue': queue, 'data': data})

schedule_task('foo_queue', foo_data, 60)

ディスパッチャーは次のようになります。

while working:
    redis.watch(SCHEDULED_ZSET_KEY)
    min_score = 0
    max_score = time.time()
    results = redis.zrangebyscore(
        SCHEDULED_ZSET_KEY, min_score, max_score, start=0, num=1, withscores=False)
    if results is None or len(results) == 0:
        redis.unwatch()
        sleep(1)
    else: # len(results) == 1
        redis.multi()
        redis.rpush(results[0]['queue'], results[0]['data'])
        redis.zrem(SCHEDULED_ZSET_KEY, results[0])
        redis.exec()

foo ワーカーは次のようになります。

while working:
    task_data = redis.blpop('foo_queue', POP_TIMEOUT)
    if task_data:
        foo(task_data)

[1] このソリューションは、http://www.saltycrane.com/blog/2011/11/unique-python-redis-based-queue-delay/にある not_a_golfer のものとトランザクションの redis ドキュメントに基づいています。

于 2013-02-22T03:27:28.423 に答える
7

使用している言語を指定していません。少なくとも Python でコードを 1 行も書かずにこれを行うには、少なくとも 3 つの選択肢があります。

  1. Celery にはオプションの redis ブローカーがあります。 http://celeryproject.org/

  2. resque は、redis を使用した非常に人気のある redis タスク キューです。 https://github.com/defunkt/resque

  3. RQ はシンプルで小さな redis ベースのキューであり、「セロリとレスクから良いものを取り」、より簡単に操作できるようにすることを目的としています。 http://python-rq.org/

それらを使用できない場合は、少なくともそのデザインを見ることができます。

しかし、あなたの質問に答えるために-あなたが望むことはredisで行うことができます。私は実際に過去に多かれ少なかれそれを書いてきました。

編集:redisで必要なものをモデリングする場合、これは私が行うことです:

  1. タイムスタンプを使用してタスクをキューに入れることは、クライアントによって直接行われます。タイムスタンプをスコアとし、タスクを値として並べ替えられたセットにタスクを配置します (ZADD を参照)。

  2. 中央のディスパッチャは N 秒ごとに起動し、このセットの最初のタイムスタンプをチェックアウトします。実行の準備ができているタスクがある場合は、そのタスクを「すぐに実行する」リストにプッシュします。これは、「待機中」のソート済みセットで ZREVRANGEBYSCORE を使用して実行でき、timestamp<=now のすべてのアイテムを取得するため、準備ができているすべてのアイテムを一度に取得できます。プッシュは RPUSH で行います。

  3. 労働者は「今すぐ実行する」リストでBLPOPを使用し、何か取り組むべきことがあるときに目を覚まして、自分の仕事をします。redis はシングル スレッドであり、2 つのワーカーが同じタスクを実行することはないため、これは安全です。

  4. 終了すると、ワーカーは結果を応答キューに戻し、ディスパッチャーまたは別のスレッドによってチェックされます。「保留中」のバケットを追加して、障害などを回避できます。

したがって、コードは次のようになります (これは単なる疑似コードです)。

クライアント:

ZADD "new_tasks" <TIMESTAMP> <TASK_INFO>

発車係:

while working:
   tasks = ZREVRANGEBYSCORE "new_tasks" <NOW> 0 #this will only take tasks with timestamp lower/equal than now
   for task in tasks:

       #do the delete and queue as a transaction
       MULTI
       RPUSH "to_be_executed" task
       ZREM "new_tasks" task
       EXEC

   sleep(1)

応答キューの処理は追加しませんでしたが、多かれ少なかれワーカーに似ています。

ワーカー:

while working:
   task = BLPOP "to_be_executed" <TIMEOUT>
   if task:
      response = work_on_task(task)
      RPUSH "results" response

編集: ステートレス アトミック ディスパッチャー:

while working:

   MULTI
   ZREVRANGE "new_tasks" 0 1
   ZREMRANGEBYRANK "new_tasks" 0 1
   task = EXEC

   #this is the only risky place - you can solve it by using Lua internall in 2.6
   SADD "tmp" task

   if task.timestamp <= now:
       MULTI
       RPUSH "to_be_executed" task
       SREM "tmp" task
       EXEC
   else:

       MULTI
       ZADD "new_tasks" task.timestamp task
       SREM "tmp" task
       EXEC

   sleep(RESOLUTION)
于 2012-06-03T10:53:38.270 に答える
0

組み合わせたアプローチはもっともらしいようです:

  1. 新しいタスクのタイムスタンプが現在の時間よりも小さい場合はありません (小さい場合はクランプ)。信頼できる NTP 同期を前提としています。

  2. すべてのタスクは、タスク タイムスタンプのサフィックスが付いたキーのバケット リストに移動します。

  3. さらに、すべてのタスクのタイムスタンプは専用の zset (キーとスコア - タイムスタンプ自体) に送られます。

  4. 新しいタスクは、別の Redis リストを介してクライアントから受け入れられます。

  5. ループ: zrangebyscore ... limit を使用して、最も古い N 個の期限切れのタイムスタンプを取得します。

  6. 新しいタスク リストとフェッチされたタイムスタンプのリストのタイムアウトを伴う BLPOP。

  7. 古いタスクを取得した場合は、それを処理します。新しい場合 — バケットと zset に追加します。

  8. 処理されたバケットが空かどうかを確認します。もしそうなら — zset から list と entrt を削除します。時刻同期の問題を防ぐために、最近期限切れになったバケットはおそらくチェックしないでください。ループを終了します。

批評?コメント? 代替案?

于 2012-06-03T08:23:38.357 に答える