4

いくつかのタスクをパイプラインとして書き直したいと思います。主な理由は、タスクがいつ終了したか、特定の順序でタスクを開始したかを検出する方法が必要だからです。私の問題は、再帰タスクをパイプラインに書き直す方法がわからないことです。再帰的とは、次のように自分自身を呼び出すタスクを意味します。

class MyTask(webapp.RequestHandler):
    def post(self):
        cursor = self.request.get('cursor', None)

        [set cursor if not null]
        [fetch 100 entities form datastore]

        if len(result) >= 100:
            [ create the same task in the queue and pass the cursor ]

        [do actual work the task was created for]

今、私はそれをパイプラインとして書き、次のようなことをしたいと思っています:

class DoSomeJob(pipeline.Pipeline):

   def run(self):
       with pipeline.InOrder():
          yield MyTask()
          yield MyOtherTask()
          yield DoSomeMoreWork(message2)

これについての助けは大歓迎です。ありがとうございました!

4

2 に答える 2

12

基本的なパイプラインは値を返すだけです:

class MyFirstPipeline(pipeline.Pipeline):
    def run(self):
        return "Hello World"  

値は JSON シリアライズ可能である必要があります。

複数のパイプラインを調整する必要がある場合は、ジェネレーター パイプラインyieldステートメントを使用する必要があります。

class MyGeneratorPipeline(pipeline.Pipeline):
    def run(self):
        yield MyFirstPipeline()

パイプラインの譲歩を'future'を返すかのように扱うことができます。

この未来を入力引数として別のパイプラインに渡すことができます。

class MyGeneratorPipeline(pipeline.Pipeline):
    def run(self):
        result = yield MyFirstPipeline()
        yield MyOtherPipeline(result)

Pipeline API は、 future fromが実際の値に解決された場合にのみ のrunメソッドがMyOtherPipeline呼び出されるようにします。resultMyFirstPipeline

yieldreturnを同じ方法で混合することはできません。値を使用しyieldている場合は、Pipeline インスタンスである必要があります。これを行う場合、これは問題につながる可能性があります。

class MyRootPipeline(pipeline.Pipeline):
    def run(self, *input_args):
        results = []
        for input_arg in input_args:
            intermediate = yield MyFirstPipeline(input_arg)
            result = yield MyOtherPipeline(intermediate)
            results.append(result)
        yield results

この場合、Pipeline API は最終行のリストyield resultsを見るだけなので、返される前にリスト内の先物を解決することを知らず、エラーが発生します。

それらは文書化されていませんが、ここで役立つユーティリティ パイプラインのライブラリが含まれています:
https://code.google.com/p/appengine-pipeline/source/browse/trunk/src/pipeline/common.py

したがって、実際に機能する上記のバージョンは次のようになります。

import pipeline
from pipeline import common

class MyRootPipeline(pipeline.Pipeline):
    def run(self, *input_args):
        results = []
        for input_arg in input_args:
            intermediate = yield MyFirstPipeline(input_arg)
            result = yield MyOtherPipeline(intermediate)
            results.append(result)
        yield common.List(*results)

これで問題ありません。パイプライン インスタンスが生成され、Pipeline API はその将来の値を適切に解決することを認識しています。パイプラインのソースcommon.Listは非常に単純です。

class List(pipeline.Pipeline):
    """Returns a list with the supplied positional arguments."""

    def run(self, *args):
        return list(args)

...このパイプラインのメソッドが呼び出された時点でrun、Pipeline API はリスト内のすべてのアイテムを実際の値に解決しており、これを として渡すことができます*args

とにかく、元の例に戻ると、次のようなことができます。

class FetchEntitites(pipeline.Pipeline):
    def run(self, cursor=None)
        if cursor is not None:
            cursor = Cursor(urlsafe=cursor)

        # I think it's ok to pass None as the cursor here, haven't confirmed
        results, next_curs, more = MyModel.query().fetch_page(100,
                                                              start_cursor=cursor)

        # queue up a task for the next page of results immediately
        future_results = []
        if more:
            future_results = yield FetchEntitites(next_curs.urlsafe())

        current_results = [ do some work on `results` ]

        # (assumes current_results and future_results are both lists)
        # this will have to wait for all of the recursive calls in
        # future_results to resolve before it can resolve itself:
        yield common.Extend(current_results, future_results)

さらなる説明

result = yield MyPipeline()最初に、「未来」を返すかのように扱うことができると言いました。これは厳密には正しくありません。明らかに、実際にはインスタンス化されたパイプラインを生成しているだけです。(言うまでもなく、runメソッドはジェネレーター関数になりました。)

Pythonyield 式yieldがどのように機能するかの奇妙な部分は、そのように見えるにもかかわらず、値がvar. 式の左側の var の値も、ジェネレーターを呼び出すことによって、関数の外部からプッシュされます(ジェネレーターは、定義したメソッドです)。resultresultsendrun

したがって、インスタンス化された Pipeline を生成することにより、Pipeline API がそのインスタンスを取得し、そのrunメソッドを別の場所で呼び出すことができます (実際には、クラス名と args と kwargs と re のセットとしてタスク キューに渡されます)。 -そこでインスタンス化されます...これが、引数とkwargsもJSONシリアル化可能である必要がある理由です)。

一方、Pipeline API はオブジェクトをジェネレーターにsend渡し、これがvarに表示されます。少し不思議で直感に反するように思えますが、これが yield 式を持つジェネレーターの仕組みです。PipelineFuturerunresult

このレベルに到達するにはかなりの頭を悩ませましたが、間違っていた点についての説明や修正を歓迎します。

于 2014-09-02T09:57:58.940 に答える
3

パイプラインを作成すると、「ステージ」を表すオブジェクトが返されます。ステージにその ID を尋ねて、それを保存することができます。後で、保存された ID からステージを再構成し、完了したかどうかをステージに尋ねることができます。

http://code.google.com/p/appengine-pipeline/wiki/GettingStartedを参照して を探しますhas_finalized。必要なことのほとんどを行う例があります。

于 2012-07-06T22:37:15.617 に答える