セロリとジャンゴセロリを使用しています。テストしたい定期的なタスクを定義しました。シェルから定期的なタスクを手動で実行して、コンソール出力を表示することはできますか?
3 に答える
Djangoシェルからタスクを実行してみましたか?タスクのメソッドを使用して.apply
、タスクが熱心にローカルで実行されるようにすることができます。
タスクがサブモジュールのmy_task
Djangoアプリmyapp
で呼び出されると仮定します。tasks
$ python manage.py shell
>>> from myapp.tasks import my_task
>>> eager_result = my_task.apply()
結果インスタンスのAPIは通常のAsyncResult
タイプと同じですが、結果は常に熱心にローカルで評価され.apply()
、タスクが完了するまでメソッドがブロックされる点が異なります。
たとえば、定期的な時間が満たされないなど、条件が満たされないときにタスクをトリガーするだけの場合。2 つのステップでそれを行うことができます。
1.タスク ID を取得します。
入力することで実行できます。
celery inspect registered
のようなものが表示されますapp.tasks.update_something
。何もない場合は、おそらくcelery
開始されていません。実行するだけです。
2.タスクを実行しますcelery call
celery call app.tasks.update_something
詳細については、入力してください
celery --help
celery inspect --help
celery call --help
2 つのシェルを開く必要があると思います。1 つは Python/Django シェルからタスクを実行するためのもので、もう 1 つは実行するためのものですcelery worker
( python manage.py celery worker
)。apply()
前の回答で述べたように、またはを使用してタスクを実行できます。apply_async()
非推奨のコマンドを使用していないように、回答を編集しました。