2

私は小さな Django アプリケーションを書いています。モデル オブジェクトごとに、一定の間隔で実行される定期的なタスクを作成できるはずです。私はこれにCeleryアプリケーションを使用していますが、1つのことを理解できません:

class ProcessQueryTask(PeriodicTask):
   run_every = timedelta(minutes=1)

   def run(self, query_task_pk, **kwargs):
       logging.info('Process celery task for QueryTask %d' %
query_task_pk)
       task = QueryTask.objects.get(pk=query_task_pk)
       task.exec_task()
       return True

それから私は次のことをしています:

>>> from tasks.tasks import ProcessQueryTask
>>> result1 = ProcessQueryTask.delay(query_task_pk=1)
>>> result2 = ProcessQueryTask.delay(query_task_pk=2)

最初の呼び出しは成功しましたが、エラーを返す他の定期的な呼び出し - TypeError: run() は、celeryd サーバーでキーワード以外の引数を 2 つ (1 つ指定) 取ります。独自のパラメーターを PeriodicTask に渡すことはできますrun()か?

4

1 に答える 1

5

これは、celery-users の Google グループでのあなたの質問に対するAsk Solem の回答で素晴らしい回答がありました。

定期タスクは引数を使用しないため、いくつかのクラスを作成するか、複数の「モデル」を処理する定期タスクを作成する必要があります。

例えば:

from celery.task import PeriodicTask
from celery.decorators import periodic_task

# base class
class BaseProcessQueryTask(PeriodicTask):
    abstract = True
    run_every = timedelta(minutes=1)
    query_task_pk  = None

    def run(self):
        task = QueryTask.objects.get(pk=self.query_task_pk)
        task.exec_task()

class ProcessQueryTask1(BaseProcessQueryTask):
    query_task_pk = 1

class ProcessQueryTask2(BaseProcessQueryTask):
    query_task_pk = 2

しかし、次のようなものが必要になる可能性が高くなります。

@task(ignore_result=True)
def execute_query_task(task):
    task.exec_task()

@periodic_task(run_every=timedelta(minutes=1))
def process_query_tasks():
    for task in QueryTask.objects.all():
        ExecuteQueryTask.delay(task)
于 2010-12-08T17:16:02.537 に答える