hereから特定の時間に特定の間隔で実行するようにタスクを設定できることがわかりましたが、それはタスク宣言中にのみ行われました。タスクを定期的に動的に実行するように設定するにはどうすればよいですか?
4 に答える
スケジュールは設定から派生しているため、実行時には不変のようです。
Task ETAを使用すると、探していることを達成できる可能性があります。これにより、タスクが希望の時間より前に実行されないことが保証されますが、指定された時間にタスクを実行することは約束されません。指定されたETAでワーカーが過負荷になると、タスクは後で実行される可能性があります。
その制限が問題にならない場合は、最初に次のように実行されるタスクを作成できます。
@task
def mytask():
keep_running = # Boolean, should the task keep running?
if keep_running:
run_again = # calculate when to run again
mytask.apply_async(eta=run_again)
# ... do the stuff you came here to do ...
このアプローチの主な欠点は、実行中のタスクを記憶するためにタスクストアに依存していることです。それらの1つが失敗してから次のタスクを実行した場合、タスクは二度と実行されません。ブローカーがディスクに永続化されておらず、ブローカーが停止した場合(すべての実行中のタスクを実行している場合)、これらのタスクはいずれも再度実行されません。
これらの問題は、ある種のトランザクションログと定期的な「乳母」タスクを使用して解決できます。このタスクの仕事は、早すぎる死を遂げたタスクを見つけて復活させることです。
あなたが説明したことを実行しなければならなかったとしたら、これが私がそれにアプローチする方法だと思います。
celery.task.base.PeriodicTask
is_due
次の実行をいつ行うべきかを決定するものを定義します。この関数をオーバーライドして、カスタムの動的実行ロジックを含めることができます。ここのドキュメントを参照してください: http://docs.celeryproject.org/en/latest/reference/celery.task.base.html?highlight=is_due#celery.task.base.PeriodicTask.is_due
例:
import random
from celery.task import PeriodicTask
class MyTask(PeriodicTask):
def run(self, **kwargs):
logger = self.get_logger(**kwargs)
logger.info("Running my task")
def is_due(self, last_run_at):
# Add your logic for when to run. Mine is random
if random.random() < 0.5:
# Run now and ask again in a minute
return (True, 60)
else:
# Don't run now but run in 10 secs
return (True, 10)
ここを参照してくださいhttp://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html
私はあなたがそれを動的に作ることができないと思います...最良の方法はタスクでタスクを作成することです:D
たとえば、X秒後に何かを実行したい場合は、x秒の遅延で新しいタスクを作成し、このタスクでN*X秒の遅延の別のタスクを作成します...