5

ViewFlow ライブラリで次のユース ケースを実装したいと思います。

問題

ユーザーによって開始された特定のフローのプロセスは、セロリ ジョブを実行する前にキューで待機する必要があります。各ユーザーには、これらのプロセスのキューがあります。スケジュールに基づいて、または手動でトリガーして、キュー内の次のプロセスを続行できます。

フロー内のノードが名前付きキューに入ります。アプリケーション内の他のロジックは、キューごとに、いつ次のタスクを続行できるかを決定します。キュー内の次のタスクが選択され、そのアクティベーションの done() メソッドが呼び出されます。

フローの例は次のようになります。

class MyFlow(Flow):

    start = flow.Start(...).Next(queue_wait)
    queue_wait = QueueWait("myQueue").Next(job)
    job = celery.Job(...).Next(end)
    end = flow.End()

質問

キューイングを実装するための最良のアプローチは何ですか? 上記の例では、「QueueWait」がどうあるべきかわかりません。

ドキュメントとビューフロー コードを読みましたが、組み込みの Node および Activation クラス (func.Function など) を使用してこれを実行できるかどうか、またはカスタム クラスで拡張する必要があるかどうかはまだ明確ではありません。

4

1 に答える 1

4

多くの実験の後、実行可能でシンプルな解決策にたどり着きました。

from viewflow.flow import base
from viewflow.flow.func import FuncActivation
from viewflow.activation import STATUS


class Queue(base.NextNodeMixin,
            base.UndoViewMixin,
            base.CancelViewMixin,
            base.DetailsViewMixin,
            base.Event):

    """
    Node that halts the flow and waits in a queue. To process the next waiting task
    call the dequeue method, optionally specifying the task owner.

    Example placing a job in a queue::

        class MyFlow(Flow):
            wait = Queue().Next(this.job)
            job = celery.Job(send_stuff).Next(this.end)
            end = flow.End()

        somewhere in the application code:
        MyFlow.wait.dequeue()
        or:
        MyFlow.wait.dequeue(process__myprocess__owner=user)

    Queues are logically separated by the task_type, so new queues defined in a
    subclass by overriding task_type attribute.
    """

    task_type = 'QUEUE'
    activation_cls = FuncActivation

    def __init__(self, **kwargs):
        super(Queue, self).__init__(**kwargs)

    def dequeue(self, **kwargs):
        """
        Process the next task in the queue by created date/time. kwargs is
        used to add task filter arguments, thereby effectively splitting the queue
        into subqueues. This could be used to implement per-user queues.

        Returns True if task was found and dequeued, False otherwise
        """
        filter_kwargs = {'flow_task_type': self.task_type, 'status': STATUS.NEW}
        if kwargs is not None:
            filter_kwargs.update(kwargs)

        task = self.flow_cls.task_cls.objects.filter(**filter_kwargs).order_by('created').first()
        if task is not None:
            lock = self.flow_cls.lock_impl(self.flow_cls.instance)
            with lock(self.flow_cls, task.process_id):
                task = self.flow_cls.task_cls._default_manager.get(pk=task.pk)
                activation = self.activation_cls()
                activation.initialize(self, task)
                activation.prepare()
                activation.done()
            return True

        return False

可能な限り一般的なものにして、複数の名前付きキューと、ユーザーごとのキューなどのサブキューの定義をサポートするようにしました。

于 2015-07-29T16:52:13.083 に答える