セロリ ビートとセロリ (4 ワーカー) を使用して、いくつかの処理手順を一括で実行します。これらのタスクの 1 つは、「Y が作成されていない X ごとに、Y を作成する」という行にほぼ沿ったものです。
タスクは、準高速 (10 秒) で定期的に実行されます。タスクは非常に迅速に完了します。進行中の他のタスクもあります。
ビート タスクが明らかにバックログになるという問題に何度も遭遇したため、(異なるビート タイムの) 同じタスクが同時に実行され、誤って重複した作業が発生します。また、タスクが順不同で実行されているようにも見えます。
セロリビートを制限して、一度にタスクの未解決のインスタンスを 1 つだけにすることはできますか?
rate_limit=5
これを行うための「正しい」方法は、タスクのようなものを設定していますか?Beat タスクが順番どおりに実行されるようにすることは可能ですか? たとえば、タスクをディスパッチする代わりに、beat がそれをタスク チェーンに追加しますか?
これらのタスク自体をアトミックに実行し、同時に実行しても安全である以外に、これを処理する最良の方法は何ですか? これは、私が予想していたビート タスクの制限ではありませんでした…</p>
タスク自体は素朴に定義されています。
@periodic_task(run_every=timedelta(seconds=10))
def add_y_to_xs():
# Do things in a database
return
これが実際の(クリーンアップされた)ログです。
[00:00.000]
foocorp.tasks.add_y_to_xs が送信されました。ID->#1[00:00.001]
受信タスク: foocorp.tasks.add_y_to_xs[#1][00:10.009]
foocorp.tasks.add_y_to_xs が送信されました。ID->#2[00:20.024]
foocorp.tasks.add_y_to_xs が送信されました。ID->#3[00:26.747]
受信タスク: foocorp.tasks.add_y_to_xs[#2][00:26.748]
TaskPool: 適用 #2[00:26.752]
受信タスク: foocorp.tasks.add_y_to_xs[#3][00:26.769]
受け入れられたタスク: foocorp.tasks.add_y_to_xs[#2] pid:26528[00:26.775]
タスク foocorp.tasks.add_y_to_xs[#2] は 0.0197986490093s で成功しました: なし[00:26.806]
TaskPool: 適用 #1[00:26.836]
TaskPool: 適用 #3[01:30.020]
受け入れられたタスク: foocorp.tasks.add_y_to_xs[#1] pid:26526[01:30.053]
受け入れられたタスク: foocorp.tasks.add_y_to_xs[#3] pid:26529[01:30.055]
foocorp.tasks.add_y_to_xs[#1]: X ID に Y を追加 #9725[01:30.070]
foocorp.tasks.add_y_to_xs[#3]: X ID に Y を追加 #9725[01:30.074]
タスク foocorp.tasks.add_y_to_xs[#1] は 0.0594762689434s で成功しました: なし[01:30.087]
タスク foocorp.tasks.add_y_to_xs[#3] は 0.0352867960464s で成功しました: なし
現在、トランスポートとしてRabbitMQを備えたCelery 3.1.4を使用しています。
編集ダン、ここに私が思いついたものがあります:
ダン、これが私が最終的に使用したものです:
from sqlalchemy import func
from sqlalchemy.exc import DBAPIError
from contextlib import contextmanager
def _psql_advisory_lock_blocking(conn, lock_id, shared, timeout):
lock_fn = (func.pg_advisory_xact_lock_shared
if shared else
func.pg_advisory_xact_lock)
if timeout:
conn.execute(text('SET statement_timeout TO :timeout'),
timeout=timeout)
try:
conn.execute(select([lock_fn(lock_id)]))
except DBAPIError:
return False
return True
def _psql_advisory_lock_nonblocking(conn, lock_id, shared):
lock_fn = (func.pg_try_advisory_xact_lock_shared
if shared else
func.pg_try_advisory_xact_lock)
return conn.execute(select([lock_fn(lock_id)])).scalar()
class DatabaseLockFailed(Exception):
pass
@contextmanager
def db_lock(engine, name, shared=False, block=True, timeout=None):
"""
Context manager which acquires a PSQL advisory transaction lock with a
specified name.
"""
lock_id = hash(name)
with engine.begin() as conn, conn.begin():
if block:
locked = _psql_advisory_lock_blocking(conn, lock_id, shared,
timeout)
else:
locked = _psql_advisory_lock_nonblocking(conn, lock_id, shared)
if not locked:
raise DatabaseLockFailed()
yield
そして、セロリ タスク デコレータ (定期的なタスクにのみ使用):
from functools import wraps
from preo.extensions import db
def locked(name=None, block=True, timeout='1s'):
"""
Using a PostgreSQL advisory transaction lock, only runs this task if the
lock is available. Otherwise logs a message and returns `None`.
"""
def with_task(fn):
lock_id = name or 'celery:{}.{}'.format(fn.__module__, fn.__name__)
@wraps(fn)
def f(*args, **kwargs):
try:
with db_lock(db.engine, name=lock_id, block=block,
timeout=timeout):
return fn(*args, **kwargs)
except DatabaseLockFailed:
logger.error('Failed to get lock.')
return None
return f
return with_task