10

セロリ ビートとセロリ (4 ワーカー) を使用して、いくつかの処理手順を一括で実行します。これらのタスクの 1 つは、「Y が作成されていない X ごとに、Y を作成する」という行にほぼ沿ったものです。

タスクは、準高速 (10 秒) で定期的に実行されます。タスクは非常に迅速に完了します。進行中の他のタスクもあります。

ビート タスクが明らかにバックログになるという問題に何度も遭遇したため、(異なるビート タイムの) 同じタスクが同時に実行され、誤って重複した作業が発生します。また、タスクが順不同で実行されているようにも見えます。

  1. セロリビートを制限して、一度にタスクの未解決のインスタンスを 1 つだけにすることはできますか? rate_limit=5これを行うための「正しい」方法は、タスクのようなものを設定していますか?

  2. Beat タスクが順番どおりに実行されるようにすることは可能ですか? たとえば、タスクをディスパッチする代わりに、beat がそれをタスク チェーンに追加しますか?

  3. これらのタスク自体をアトミックに実行し、同時に実行しても安全である以外に、これを処理する最良の方法は何ですか? これは、私が予想していたビート タスクの制限ではありませんでした…</p>

タスク自体は素朴に定義されています。

@periodic_task(run_every=timedelta(seconds=10))
def add_y_to_xs():
    # Do things in a database
    return

これが実際の(クリーンアップされた)ログです。

  • [00:00.000]foocorp.tasks.add_y_to_xs が送信されました。ID->#1
  • [00:00.001]受信タスク: foocorp.tasks.add_y_to_xs[#1]
  • [00:10.009]foocorp.tasks.add_y_to_xs が送信されました。ID->#2
  • [00:20.024]foocorp.tasks.add_y_to_xs が送信されました。ID->#3
  • [00:26.747]受信タスク: foocorp.tasks.add_y_to_xs[#2]
  • [00:26.748]TaskPool: 適用 #2
  • [00:26.752]受信タスク: foocorp.tasks.add_y_to_xs[#3]
  • [00:26.769]受け入れられたタスク: foocorp.tasks.add_y_to_xs[#2] pid:26528
  • [00:26.775]タスク foocorp.tasks.add_y_to_xs[#2] は 0.0197986490093s で成功しました: なし
  • [00:26.806]TaskPool: 適用 #1
  • [00:26.836]TaskPool: 適用 #3
  • [01:30.020]受け入れられたタスク: foocorp.tasks.add_y_to_xs[#1] pid:26526
  • [01:30.053]受け入れられたタスク: foocorp.tasks.add_y_to_xs[#3] pid:26529
  • [01:30.055]foocorp.tasks.add_y_to_xs[#1]: X ID に Y を追加 #9725
  • [01:30.070]foocorp.tasks.add_y_to_xs[#3]: X ID に Y を追加 #9725
  • [01:30.074]タスク foocorp.tasks.add_y_to_xs[#1] は 0.0594762689434s で成功しました: なし
  • [01:30.087]タスク foocorp.tasks.add_y_to_xs[#3] は 0.0352867960464s で成功しました: なし

現在、トランスポートとしてRabbitMQを備えたCelery 3.1.4を使用しています。

編集ダン、ここに私が思いついたものがあります:

ダン、これが私が最終的に使用したものです:

from sqlalchemy import func
from sqlalchemy.exc import DBAPIError
from contextlib import contextmanager


def _psql_advisory_lock_blocking(conn, lock_id, shared, timeout):
    lock_fn = (func.pg_advisory_xact_lock_shared
               if shared else
               func.pg_advisory_xact_lock)
    if timeout:
        conn.execute(text('SET statement_timeout TO :timeout'),
                     timeout=timeout)
    try:
        conn.execute(select([lock_fn(lock_id)]))
    except DBAPIError:
        return False
    return True


def _psql_advisory_lock_nonblocking(conn, lock_id, shared):
    lock_fn = (func.pg_try_advisory_xact_lock_shared
               if shared else
               func.pg_try_advisory_xact_lock)
    return conn.execute(select([lock_fn(lock_id)])).scalar()


class DatabaseLockFailed(Exception):
    pass


@contextmanager
def db_lock(engine, name, shared=False, block=True, timeout=None):
    """
    Context manager which acquires a PSQL advisory transaction lock with a
    specified name.
    """
    lock_id = hash(name)

    with engine.begin() as conn, conn.begin():
        if block:
            locked = _psql_advisory_lock_blocking(conn, lock_id, shared,
                                                  timeout)
        else:
            locked = _psql_advisory_lock_nonblocking(conn, lock_id, shared)
        if not locked:
            raise DatabaseLockFailed()
        yield

そして、セロリ タスク デコレータ (定期的なタスクにのみ使用):

from functools import wraps
from preo.extensions import db


def locked(name=None, block=True, timeout='1s'):
    """
    Using a PostgreSQL advisory transaction lock, only runs this task if the
    lock is available. Otherwise logs a message and returns `None`.
    """
    def with_task(fn):
        lock_id = name or 'celery:{}.{}'.format(fn.__module__, fn.__name__)

        @wraps(fn)
        def f(*args, **kwargs):
            try:
                with db_lock(db.engine, name=lock_id, block=block,
                             timeout=timeout):
                    return fn(*args, **kwargs)
            except DatabaseLockFailed:
                logger.error('Failed to get lock.')
                return None
        return f
    return with_task
4

5 に答える 5

7

これを行う唯一の方法は、自分でロック戦略を実装することです:

参照については、こちらのセクションをお読みください。

cron と同様に、最初のタスクが次のタスクの前に完了しない場合、タスクが重複する可能性があります。それが懸念される場合は、ロック戦略を使用して、一度に 1 つのインスタンスのみが実行されるようにする必要があります (たとえば、タスクが一度に 1 つだけ実行されるようにするを参照してください)。

于 2014-01-03T01:22:14.673 に答える
2

私は、 erydo が彼のコメントでほのめかしたものと同様に、Postgres のアドバイザリ ロックを使用するデコレータを作成することに挑戦しました。

あまりきれいではありませんが、正しく動作しているようです。これは、Python 2.7 で SQLAlchemy 0.9.7 を使用した場合です。

from functools import wraps
from sqlalchemy import select, func

from my_db_module import Session # SQLAlchemy ORM scoped_session

def pg_locked(key):
    def decorator(f):
        @wraps(f)
        def wrapped(*args, **kw):
            session = db.Session()
            try:
                acquired, = session.execute(select([func.pg_try_advisory_lock(key)])).fetchone()
                if acquired:
                    return f(*args, **kw)
            finally:
                if acquired:
                    session.execute(select([func.pg_advisory_unlock(key)]))
        return wrapped
    return decorator

@app.task
@pg_locked(0xdeadbeef)
def singleton_task():
    # only 1x this task can run at a time
    pass

(これを改善する方法についてのコメントを歓迎します!)

于 2014-08-26T20:48:10.627 に答える
1

これらの Celery ビート インスタンスは、異なるホストにまたがる可能性がある本質的に異なるプロセスであるため、分散ロック システムが必要です。

ZooKeeper や etcd などの中央座標系は、分散ロック システムの実装に適しています。

軽量で高速な etcd の使用をお勧めします。以下のように、etcd に対するロックの実装がいくつかあります。

python-etcd-ロック

于 2016-01-10T18:26:12.963 に答える