6

celery 3.0のチェーンを使用して作成された実行中のワークフローを一時停止/再開する方法はありますか?

基本的に、システムには2つの異なるタイプのタスクがあります。インタラクティブタスクと非インタラクティブタスクです。非インタラクティブなものにはすべてのパラメーターがありますが、インタラクティブなものにはユーザー入力が必要です。インタラクティブタスクの場合、結果がインタラクティブタスクに影響するため、チェーン内の以前のすべてのタックが完了した後にのみユーザー入力を要求できることに注意してください(つまり、実際のチェーンを作成する前にユーザー入力を要求することはできません)。

これにアプローチする方法について何か提案はありますか?ここで本当に途方に暮れています。

現在のアイデア:

  • Taskの2つのサブクラスを作成します(celery import Taskから)。インタラクティブタスクサブクラスに追加のインスタンス(クラスメンバー)変数を追加します。これはデフォルトでfalseに設定されており、ユーザー入力がまだ必要であることを表します。どういうわけか、タスクのインスタンスにアクセスし、セロリワーカーの外部からtrueに設定します(これをかなり調べたので、別のモジュールから直接タスクオブジェクトにアクセスすることはできないようです)
  • チェーンをインタラクティブジョブで区切られた複数のチェーンに分割します。チェーンが終了に達したら、セロリワーカーの外部にある種のメカニズムを検出して、インタラクティブタスクのインタラクティブクライアント側コンポーネントをトリガーします。ユーザーがこのすべてのデータを入力したら、データを取得し、インタラクティブタスクが新しいチェーンの先頭にある新しいチェーンを開始します。
4

1 に答える 1

2

私たちはあなたの2番目のアイデアのようなものを私たちのプロジェクトに実装しました、そしてそれはうまくいきます。これが実装の要点です。

モデルに新しいフィールドを追加statusし、saveメソッドをオーバーライドします。

models.py:

class My_Model(models.Model):
    # some fields
    status = models.IntegerField(default=0)

    def save(self, *args, **kwargs):
        super(My_Model, self).save(*args, **kwargs)
        from .functions import custom_func
        custom_func(self.status)

tasks.py

@celery.task()
def non_interactive_task():
    #do something.

@celery.task()
def interactive_task():
    #do something.

features.py

def custom_func(status):
    #Change status after non interactive task is completed.
    #Based on status, start interactive task.

statusユーザーが情報を入力するためのUI要素を表示するのに役立つテンプレートに変数を渡します。ユーザーが必要な情報を入力したら、ステータスを変更します。これによりcustom_func、がトリガーされますinteractive_task

于 2014-09-03T13:23:07.967 に答える