7

いくつかのコンテキスト:ユーザーがアクションを事前に保存し、そのアクションを実行したい将来の正確な日付/時刻をスケジュールできるDjangoアプリを構築しています。たとえば、来週の午前 5 時 30 分に投稿をプログラムで Facebook ウォールにプッシュするようにスケジュールを設定します。

私は、1 回限りのタスクの 1000 のインスタンスを処理でき、すべてがほぼ同時に実行されるように設定されている (エラー マージン プラスまたはマイナス 1 分) タスク スケジューリング システムを探しています。

これには Django-celery/Rabbitmq を検討していますが、Celery のドキュメントは 1 回限りの使用を意図したタスクに対応していないことに気付きました。Django-celery はここで正しい選択ですか (おそらく CrontabSchedule をサブクラス化することによって)、それとも他のアプローチの研究にエネルギーを費やすほうがよいでしょうか? おそらく、Sched Moduleと Cron を使って何かをハッキングします。

4

2 に答える 2

12

編集2:

どういうわけか、私の頭はもともと繰り返しタスクの領域で立ち往生していました。これがより簡単な解決策です。

本当に必要なのは、ユーザー アクションごとに 1 つのタスクを定義することだけです。実行するタスクをデータベースに保存することをスキップできます。それがセロリの目的です。

Facebook の投稿の例をもう一度再利用してpost_to_facebook、ユーザーといくつかのテキストを取得し、いくつかの魔法を実行し、テキストをそのユーザーの Facebook に投稿する関数がどこかにあると仮定すると、次のようなタスクになるように定義できます。

# Task to send one update.
@celery.task(ignore_result=True)
def post_to_facebook(user, text):
    # perform magic
    return whatever_you_want

ユーザーがそのような投稿をキューに入れる準備ができたら、タスクをいつ実行するかをセロリに伝えます。

post_to_facebook.apply_async(
    (user, text),   # args
    eta=datetime.datetime(2012, 9, 15, 11, 45, 4, 126440)  # pass execution options as kwargs
)

これはすべて、利用可能な呼び出しオプションの全範囲の中で、ここで詳細に説明されています: http://docs.celeryproject.org/en/latest/userguide/calling.html#eta-and-countdown

呼び出しの結果が必要な場合は、タスク定義で ignore_result パラメータをスキップして AsyncResult オブジェクトを取得し、呼び出しの結果を確認できます。詳細はこちら: http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#keeping-results

以下の回答の一部はまだ関連しています。ユーザーのアクションごとにタスクを作成したり、タスクの設計について考えたりする必要がありますが、これは、求めたことを実行するためのはるかに簡単な方法です。

定期的なタスクを使用した元の回答は次のとおりです。

ダニーロアの考えは正しい。私はここで少しそれを構築します。

編集/TLDR: 答えははいです。セロリはあなたのニーズに適しています. タスク定義を再考する必要があるかもしれません。

ユーザーが任意の Python コードを記述してタスクを定義することを許可していないと思います。それ以外では、ユーザーがスケジュールできるいくつかのアクションを事前に定義してから、ユーザーがそれらのアクションを好きなようにスケジュールできるようにする必要があります。次に、ユーザー アクションごとに 1 つのスケジュールされたタスクを実行するだけで、エントリをチェックし、各エントリに対してアクションを実行できます。

1 つのユーザー アクション:

Facebook の例を使用すると、ユーザーの更新をテーブルに保存できます。

class ScheduledPost(Model):
    user = ForeignKey('auth.User')
    text = TextField()
    time = DateTimeField()
    sent = BooleanField(default=False)

次に、毎分タスクを実行し、そのテーブルのエントリが直前に投稿される予定であるかどうかを確認します (前述のエラー マージンに基づいて)。1 分間のウィンドウに到達することが非常に重要な場合は、タスクをより頻繁に、たとえば 30 秒ごとにスケジュールすることができます。タスクは次のようになります (myapp/tasks.py 内):

@celery.task
def post_scheduled_updates():
    from celery import current_task
    scheduled_posts = ScheduledPost.objects.filter(
        sent=False,
        time__gt=current_task.last_run_at, #with the 'sent' flag, you may or may not want this
        time__lte=timezone.now()
    )
    for post in scheduled_posts:
        if post_to_facebook(post.text):
            post.sent = True
            post.save()

構成は次のようになります。

CELERYBEAT_SCHEDULE = {
    'fb-every-30-seconds': {
        'task': 'tasks.post_scheduled_updates',
        'schedule': timedelta(seconds=30),
    },
}

追加のユーザー アクション:

Facebook への投稿に加えて、ユーザー アクションごとに、新しいテーブルと新しいタスクを定義できます。

class EmailToMom(Model):
    user = ForeignKey('auth.User')
    text = TextField()
    subject = CharField(max_length=255)
    sent = BooleanField(default=False)
    time = DateTimeField()

@celery.task
def send_emails_to_mom():
    scheduled_emails = EmailToMom.objects.filter(
        sent=False,
        time__lt=timezone.now()
    )
    for email in scheduled_emails:
        sent = send_mail(
            email.subject,
            email.text,
            email.user.email,
            [email.user.mom.email],
        )
        if sent:
            email.sent = True
            email.save()

    CELERYBEAT_SCHEDULE = {
        'fb-every-30-seconds': {
            'task': 'tasks.post_scheduled_updates',
            'schedule': timedelta(seconds=30),
        },
        'mom-every-30-seconds': {
            'task': 'tasks.send_emails_to_mom',
            'schedule': timedelta(seconds=30),
        },
    }

速度と最適化:

より多くのスループットを得るには、更新を反復して投稿し、post_scheduled_updates呼び出し中にそれらを連続して送信するのではなく、一連のサブタスクを生成して並行して実行できます (十分なworkerが与えられた場合)。次に、 への呼び出しpost_scheduled_updatesが非常に迅速に実行され、多数のタスク (fb の更新ごとに 1 つ) ができるだけ早く実行されるようにスケジュールされます。それは次のようになります。

# Task to send one update. This will be called by post_scheduled_updates.
@celery.task
def post_one_update(update_id):
    try:
        update = ScheduledPost.objects.get(id=update_id)
    except ScheduledPost.DoesNotExist:
        raise
    else:
        sent = post_to_facebook(update.text)
        if sent:
            update.sent = True
            update.save()
        return sent

@celery.task
def post_scheduled_updates():
    from celery import current_task
    scheduled_posts = ScheduledPost.objects.filter(
        sent=False,
        time__gt=current_task.last_run_at, #with the 'sent' flag, you may or may not want this
        time__lte=timezone.now()
    )
    for post in scheduled_posts:
        post_one_update.delay(post.id)

私が投稿したコードはテストされておらず、確かに最適化されていませんが、正しい方向に進むはずです。あなたの質問では、スループットに関する懸念をほのめかしたので、最適化する場所を詳しく調べたいと思うでしょう。明らかな 1 つは、 を繰り返し呼び出すのではなく、一括更新することpost.sent=True;post.save()です。

より詳しい情報:

定期的なタスクの詳細: http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html .

タスク設計戦略に関するセクション: http://docs.celeryproject.org/en/latest/userguide/tasks.html#performance-and-strategies

ここにセロリの最適化に関するページ全体があります: http://docs.celeryproject.org/en/latest/userguide/optimizing.html

サブタスクに関するこのページも興味深いかもしれません: http://docs.celeryproject.org/en/latest/userguide/canvas.html .

実際、すべてのセロリのドキュメントを読むことをお勧めします。

于 2012-09-09T22:21:25.753 に答える
0

これから行うことは、ScheduledPost というモデルを作成することです。

5 分ごとに実行される PeriodicTask があります。

このタスクは、ScheduledPost テーブルをチェックして、Facebook にプッシュする必要がある投稿を探します。

于 2012-08-27T21:03:48.150 に答える