これは、他のいくつかのソリューションに基づいて構築された別のソリューションです [1]。redis WATCH コマンドを使用して、redis 2.6 で lua を使用せずに競合状態を削除します。
基本的なスキームは次のとおりです。
- スケジュールされたタスクには redis zset を使用し、すぐに実行できるタスクには redis キューを使用します。
- ディスパッチャーに zset をポーリングさせ、実行する準備ができているタスクを redis キューに移動させます。冗長性のために複数のディスパッチャが必要になる場合がありますが、おそらく多くは必要ないか、必要ありません。
- redis キューでポップをブロックするワーカーを必要な数だけ用意します。
私はそれをテストしていません:-)
foo ジョブの作成者は次のようにします。
def schedule_task(queue, data, delay_secs):
# This calculation for run_at isn't great- it won't deal well with daylight
# savings changes, leap seconds, and other time anomalies. Improvements
# welcome :-)
run_at = time.time() + delay_secs
# If you're using redis-py's Redis class and not StrictRedis, swap run_at &
# the dict.
redis.zadd(SCHEDULED_ZSET_KEY, run_at, {'queue': queue, 'data': data})
schedule_task('foo_queue', foo_data, 60)
ディスパッチャーは次のようになります。
while working:
redis.watch(SCHEDULED_ZSET_KEY)
min_score = 0
max_score = time.time()
results = redis.zrangebyscore(
SCHEDULED_ZSET_KEY, min_score, max_score, start=0, num=1, withscores=False)
if results is None or len(results) == 0:
redis.unwatch()
sleep(1)
else: # len(results) == 1
redis.multi()
redis.rpush(results[0]['queue'], results[0]['data'])
redis.zrem(SCHEDULED_ZSET_KEY, results[0])
redis.exec()
foo ワーカーは次のようになります。
while working:
task_data = redis.blpop('foo_queue', POP_TIMEOUT)
if task_data:
foo(task_data)
[1] このソリューションは、http://www.saltycrane.com/blog/2011/11/unique-python-redis-based-queue-delay/にある not_a_golfer のものとトランザクションの redis ドキュメントに基づいています。