基本的なパイプラインは値を返すだけです:
class MyFirstPipeline(pipeline.Pipeline):
def run(self):
return "Hello World"
値は JSON シリアライズ可能である必要があります。
複数のパイプラインを調整する必要がある場合は、ジェネレーター パイプラインとyield
ステートメントを使用する必要があります。
class MyGeneratorPipeline(pipeline.Pipeline):
def run(self):
yield MyFirstPipeline()
パイプラインの譲歩を'future'を返すかのように扱うことができます。
この未来を入力引数として別のパイプラインに渡すことができます。
class MyGeneratorPipeline(pipeline.Pipeline):
def run(self):
result = yield MyFirstPipeline()
yield MyOtherPipeline(result)
Pipeline API は、 future fromが実際の値に解決された場合にのみ のrun
メソッドがMyOtherPipeline
呼び出されるようにします。result
MyFirstPipeline
yield
とreturn
を同じ方法で混合することはできません。値を使用しyield
ている場合は、Pipeline インスタンスである必要があります。これを行う場合、これは問題につながる可能性があります。
class MyRootPipeline(pipeline.Pipeline):
def run(self, *input_args):
results = []
for input_arg in input_args:
intermediate = yield MyFirstPipeline(input_arg)
result = yield MyOtherPipeline(intermediate)
results.append(result)
yield results
この場合、Pipeline API は最終行のリストyield results
を見るだけなので、返される前にリスト内の先物を解決することを知らず、エラーが発生します。
それらは文書化されていませんが、ここで役立つユーティリティ パイプラインのライブラリが含まれています:
https://code.google.com/p/appengine-pipeline/source/browse/trunk/src/pipeline/common.py
したがって、実際に機能する上記のバージョンは次のようになります。
import pipeline
from pipeline import common
class MyRootPipeline(pipeline.Pipeline):
def run(self, *input_args):
results = []
for input_arg in input_args:
intermediate = yield MyFirstPipeline(input_arg)
result = yield MyOtherPipeline(intermediate)
results.append(result)
yield common.List(*results)
これで問題ありません。パイプライン インスタンスが生成され、Pipeline API はその将来の値を適切に解決することを認識しています。パイプラインのソースcommon.List
は非常に単純です。
class List(pipeline.Pipeline):
"""Returns a list with the supplied positional arguments."""
def run(self, *args):
return list(args)
...このパイプラインのメソッドが呼び出された時点でrun
、Pipeline API はリスト内のすべてのアイテムを実際の値に解決しており、これを として渡すことができます*args
。
とにかく、元の例に戻ると、次のようなことができます。
class FetchEntitites(pipeline.Pipeline):
def run(self, cursor=None)
if cursor is not None:
cursor = Cursor(urlsafe=cursor)
# I think it's ok to pass None as the cursor here, haven't confirmed
results, next_curs, more = MyModel.query().fetch_page(100,
start_cursor=cursor)
# queue up a task for the next page of results immediately
future_results = []
if more:
future_results = yield FetchEntitites(next_curs.urlsafe())
current_results = [ do some work on `results` ]
# (assumes current_results and future_results are both lists)
# this will have to wait for all of the recursive calls in
# future_results to resolve before it can resolve itself:
yield common.Extend(current_results, future_results)
さらなる説明
result = yield MyPipeline()
最初に、「未来」を返すかのように扱うことができると言いました。これは厳密には正しくありません。明らかに、実際にはインスタンス化されたパイプラインを生成しているだけです。(言うまでもなく、run
メソッドはジェネレーター関数になりました。)
Pythonのyield 式yield
がどのように機能するかの奇妙な部分は、そのように見えるにもかかわらず、値がvar. 式の左側の var の値も、ジェネレーターを呼び出すことによって、関数の外部からプッシュされます(ジェネレーターは、定義したメソッドです)。result
result
send
run
したがって、インスタンス化された Pipeline を生成することにより、Pipeline API がそのインスタンスを取得し、そのrun
メソッドを別の場所で呼び出すことができます (実際には、クラス名と args と kwargs と re のセットとしてタスク キューに渡されます)。 -そこでインスタンス化されます...これが、引数とkwargsもJSONシリアル化可能である必要がある理由です)。
一方、Pipeline API はオブジェクトをジェネレーターにsend
渡し、これがvarに表示されます。少し不思議で直感に反するように思えますが、これが yield 式を持つジェネレーターの仕組みです。PipelineFuture
run
result
このレベルに到達するにはかなりの頭を悩ませましたが、間違っていた点についての説明や修正を歓迎します。