私は小さな Django アプリケーションを書いています。モデル オブジェクトごとに、一定の間隔で実行される定期的なタスクを作成できるはずです。私はこれにCeleryアプリケーションを使用していますが、1つのことを理解できません:
class ProcessQueryTask(PeriodicTask):
run_every = timedelta(minutes=1)
def run(self, query_task_pk, **kwargs):
logging.info('Process celery task for QueryTask %d' %
query_task_pk)
task = QueryTask.objects.get(pk=query_task_pk)
task.exec_task()
return True
それから私は次のことをしています:
>>> from tasks.tasks import ProcessQueryTask
>>> result1 = ProcessQueryTask.delay(query_task_pk=1)
>>> result2 = ProcessQueryTask.delay(query_task_pk=2)
最初の呼び出しは成功しましたが、エラーを返す他の定期的な呼び出し - TypeError: run() は、celeryd サーバーでキーワード以外の引数を 2 つ (1 つ指定) 取ります。独自のパラメーターを PeriodicTask に渡すことはできますrun()
か?