Celery はこのすぐに使用できるものをサポートしていませんが、過去に同様のことを行う必要があり、自分で解決策を見つけなければなりませんでした。
私の経験では、これを実現する方法は 2 つありますが、いずれもトレードオフがあります。このようなものに足を踏み入れることができるかなり大きな穴もいくつかあるので、emptorに注意してください。
オプション1:
データストアを使用して、タスクをいつ実行する必要があるかに関する情報を保存し、セロリ ビート タスクをトリガーします。
これを行うには、定期的なタスクに関する情報を保持するデータベースとモデルを使用できます。(より多くの技術を取得したい場合は、キューに直接話しかけて、モデル ルートをスキップすることもできます。)
from django.db import models
class PeriodicTask(models.Model):
lastrun = models.DateTimeField()
nextrun = models.DateTimeField()
notes = models.TextField() # errors?
task_id = models.CharField(max_length=100)
これは、モデルが何を格納できるかについての単なる大まかなアイデアです。便利なものは何でもそこに置くことができますが、次の実行がいつ行われるかを格納するための datetime オブジェクトが必要になります。
次に、スピンアップしてすぐに実行する必要があるタスクがあるかどうかを確認するために、定期的なタスクをより頻繁に実行する必要があります。
import datetime
from .models import PeriodicTask
@periodic_task(run_every=timedelta(minutes=2), queue='activities', options={'queue': 'activities'})
def pull_activities_frequent_adaptors():
now = datetime.datetime.utcnow() # need to be clear about time-zones
scheduled_tasks = PeriodicTask.objects.filter(nextrun__gte=now)
if scheduled_tasks and scheduled_tasks.count() == 1: # more than one and we've erred somewhere
timewindow = datetime.timedelta(minutes=5)
if (scheduled_tasks[0].nextrun - now) <= timewindow:
scheduled_tasks[0].delete()
# Do the task
# schedule the next one
PeriodicTask.objects.create(
lastrun=now,
nextrun=now + datetime.timedelta(minutes=30))
潜在的な問題:
1) マスター/スレーブ設定の複数のデータベースがあり、特にラグがある場合は、二重のスケジューリングが行われる可能性があります (count() == 1
パーツであっても)。したがって、検討する価値のある競合状態があります。
2) 実行するタスクを見つけるために時間枠を使用する必要があるため、正確に 30 分に近づけるのは困難です。
3) タスクは時間枠よりも頻繁に実行する必要があります。そうしないと、タスクを見逃す可能性があります。これはリソースの浪費になる可能性があります (ただし、それほどひどいものではないと思います)。通常はスピンアップして何もしないからです。
4) 日付時刻を扱うこと以上に頭を悩ませるものはないので、タイムゾーンのことを本当に考慮し、すべてのバリエーションについて考え、このコードを徹底的にテストする必要があります。
5)これは大きな問題です。タスクの実行にスケジュールされた間隔よりも長い時間がかかる場合、2 つのタスクが同時に実行されることになります。これは問題です。繰り返しになりますが、競合状態が発生すると、状況が悪化する可能性があります。
オプション 2)
セロリ ビートを使用しないでください。最初のタスクを起動し、30 分後に別のタスクを起動させます。これは暴走する魔法使いの見習い的なものになる可能性があるので、ちょっと、ええと、怖いと思います.1番目のオプションを実行しましたが、次のオプションについて実際に話したことはありません. しかし、とにかく、私はそれができると思います:
@task # no longer a periodic task
def your_task(args):
# Whatever you want to do, then call itself again...
your_task.apply_async(args=(args), countdown=1800)
これをどこかで呼び出す必要があります。おそらく、週に 1 回スピンアップし、このことの以前のバージョンをすべて強制終了し (どのようにそれらを見つけますか?)、最初のバージョンを起動する cron ジョブである可能性があります。
私はこの答えがあまり好きではないと言わざるを得ず、何度か思いついたことがありますが、問題に対処するためのより危険で手に負えない方法のように思えます. 誰かがそれを行うかどうか、私は興味があります。