20

これが私の設定です:

  • django 1.3
  • セロリ2.2.6
  • django-セロリ2.2.4
  • djkombu 0.9.2

私のsettings.pyファイルには

BROKER_BACKEND = "djkombu.transport.DatabaseTransport"

つまり、データベースを使用してタスクをキューに入れているだけです。

次に、私の問題について説明します。ユーザーが開始したタスクがあり、完了するまでに数分かかる場合があります。タスクをユーザーごとに1回だけ実行したいので、タスクの結果を一時ファイルにキャッシュするので、ユーザーがタスクを再度開始した場合は、キャッシュされたファイルを返すだけです。ビュー関数に次のようなコードがあります。

task_id = "long-task-%d" % user_id
result = tasks.some_long_task.AsyncResult(task_id)

if result.state == celery.states.PENDING:
    # The next line makes a duplicate task if the user rapidly refreshes the page
    tasks.some_long_task.apply_async(task_id=task_id)
    return HttpResponse("Task started...")
elif result.state == celery.states.STARTED:
    return HttpResponse("Task is still running, please wait...")
elif result.state == celery.states.SUCCESS:
    if cached_file_still_exists():
        return get_cached_file()
    else:
        result.forget()
        tasks.some_long_task.apply_async(task_id=task_id)
        return HttpResponse("Task started...")

このコードはほとんど機能します。しかし、ユーザーがページをすばやくリロードすると、問題が発生します。タスクがキューに入れられてから、タスクが最終的にキューから削除されてワーカーに渡されるまでに、1〜3秒の遅延があります。この間、タスクの状態はPENDINGのままであるため、ビューロジックは重複するタスクを開始します。

必要なのは、タスクがすでにキューに送信されているかどうかを確認する方法です。これにより、タスクを2回送信することになりません。セロリでこれを行う標準的な方法はありますか?

4

3 に答える 3

5

これはRedisで解決しました。タスクごとにredisにキーを設定してから、タスクのafter_returnメソッドのredisからキーを削除するだけです。Redisは軽量で高速です。

于 2011-11-09T14:18:07.483 に答える
5

(Tomekや他の人が示唆しているように)データベースを使用することがこのロックを行う方法だとは思いません。djangoにはキャッシュフレームワークが組み込まれています。これは、このロックを実行するのに十分であり、より高速である必要があります。見る:

http://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html#cookbook-task-serial

Djangoはmemcached、キャッシュバックエンドとして使用するように構成でき、これを複数のマシンに分散できます...これは私には良いようです。考え?

于 2013-07-05T03:53:42.967 に答える
1

結果をデータベースに手動で保存することで、少しごまかすことができます。これがどのように役立つかを説明しましょう。

たとえば、RDBMS(列のあるテーブル-task_id、state、result)を使用する場合:

パーツを表示:

  1. トランザクション管理を使用します。
  2. SELECT FOR UPDATEを使用して、task_id == "long-task-%d"%user_idの行を取得します。SELECT FOR UPDATEは、このリクエストがCOMMITまたはROLLBACKするまで、他のリクエストをブロックします。
  3. 存在しない場合は、状態をPENDINGに設定し、「some_long_task」を開始して、リクエストを終了します。
  4. 状態がPENDINGの場合-ユーザーに通知します。
  5. 状態がSUCCESSの場合-状態をPENDINGに設定し、タスクを開始して、「result」列が指すファイルを返します。これは、結果を得たときにタスクを再実行したいという前提に基づいています。専念
  6. 状態がERRORの場合-状態をPENDINGに設定し、タスクを開始して、ユーザーに通知します。専念

タスクパート:

  1. ファイルを準備し、tryでラップし、ブロックをキャッチします。
  2. 成功した場合-state=SUCCESS、resultで適切な行を更新します。
  3. 失敗した場合-状態=ERRORで適切な行を更新します。
于 2011-06-29T22:16:49.020 に答える