私は MapReduce (実際にはマップするだけです) を使用して、4 つのフェーズでデータ処理タスクを実行しています。各フェーズは 1 つの MapReduce ジョブです。順番に実行する必要があります。つまり、フェーズ 1 が完了するまでフェーズ 2 を開始しないでください。共有できる経験がある人はいますか?
理想的には、この 4 つのジョブ シーケンスを一晩で実行するので、cron 対応にすることも問題ありません。
ありがとうございました
私は MapReduce (実際にはマップするだけです) を使用して、4 つのフェーズでデータ処理タスクを実行しています。各フェーズは 1 つの MapReduce ジョブです。順番に実行する必要があります。つまり、フェーズ 1 が完了するまでフェーズ 2 を開始しないでください。共有できる経験がある人はいますか?
理想的には、この 4 つのジョブ シーケンスを一晩で実行するので、cron 対応にすることも問題ありません。
ありがとうございました
Daniel が言及しているように、appengine-pipeline ライブラリはこの問題を解決するためのものです。このブログ記事の「独自のパイプライン ジョブの実装」セクションで、mapreduce ジョブの連鎖について説明します。
便宜上、関連するセクションをここに貼り付けます。
定義済みの MapreducePipeline を起動する方法がわかったので、独自のカスタム パイプライン ジョブを実装して実行する方法を見てみましょう。パイプライン ライブラリは、appengine 内で任意の分散コンピューティング ジョブを起動するための低レベル ライブラリを提供しますが、ここでは、これを使用して mapreduce ジョブを連鎖させる方法について具体的に説明します。前の例を拡張して、文字と ID の逆インデックスも出力してみましょう。
まず、親パイプライン ジョブを定義します。
class ChainMapReducePipeline(mapreduce.base_handler.PipelineBase):
def run(self):
deduped_blob_key = (
yield mapreduce.mapreduce_pipeline.MapreducePipeline(
"test_combiner",
"main.map",
"main.reduce",
"mapreduce.input_readers.RandomStringInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
combiner_spec="main.combine",
mapper_params={
"string_length": 1,
"count": 500,
},
reducer_params={
"mime_type": "text/plain",
},
shards=16))
char_to_id_index_blob_key = (
yield mapreduce.mapreduce_pipeline.MapreducePipeline(
"test_chain",
"main.map2",
"main.reduce2",
"mapreduce.input_readers.BlobstoreLineInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
# Pass output from first job as input to second job
mapper_params=(yield BlobKeys(deduped_blob_key)),
reducer_params={
"mime_type": "text/plain",
},
shards=4))
これにより、最初の例と同じジョブが起動され、そのジョブからの出力が取得され、それが 2 番目のジョブにフィードされ、各エントリが逆になります。最初のパイプライン yield の結果が 2 番目のジョブの mapper_params に渡されることに注意してください。パイプライン ライブラリはマジックを使用して、2 番目のパイプラインが最初のパイプラインの終了に依存していることを検出し、deduped_blob_key が解決されるまでパイプラインを起動しません。
次に、BlobKeys ヘルパー クラスを作成する必要がありました。最初は、これが必要だとは思いませんでした。
mapper_params={"blob_keys": deduped_blob_key},
しかし、これは 2 つの理由で機能しませんでした。1 つ目は、「ジェネレーター パイプラインは、それが生成する子パイプラインの出力に直接アクセスできない」ことです。上記のコードでは、ジェネレーター パイプラインが最初のジョブの出力で一時的な dict オブジェクトを作成する必要がありますが、これは許可されていません。2 つ目は、BlobstoreOutputWriter によって返される文字列の形式は「/blobstore/」ですが、BlobstoreLineInputReader は単に「」を想定していることです。これらの問題を解決するために、小さなヘルパー BlobKeys クラスを作成しました。多くのジョブでこれを行っていることに気付くでしょう。パイプライン ライブラリには一連の一般的なラッパーも含まれていますが、このセクションの最後で説明する MapreducePipeline フレームワーク内では動作しません。
class BlobKeys(third_party.mapreduce.base_handler.PipelineBase):
"""Returns a dictionary with the supplied keyword arguments."""
def run(self, keys):
# Remove the key from a string in this format:
# /blobstore/<key>
return {
"blob_keys": [k.split("/")[-1] for k in keys]
}
map2 および reduce2 関数のコードは次のとおりです。
def map2(data):
# BlobstoreLineInputReader.next() returns a tuple
start_position, line = data
# Split input based on previous reduce() output format
elements = line.split(" - ")
random_id = elements[0]
char = elements[1]
# Swap 'em
yield (char, random_id)
def reduce2(key, values):
# Create the reverse index entry
yield "%s - %s\n" % (key, ",".join(values))
私は google-app-engine に詳しくありませんが、すべてのジョブ構成を 1 つのメイン プログラムに入れて、それらを順番に実行することはできませんか? 次のようなものですか?これは通常の map-reduce プログラムで機能すると思うので、google-app-engine のコードがあまり変わらなければ問題なく機能するはずです。
Configuration conf1 = getConf();
Configuration conf2 = getConf();
Configuration conf3 = getConf();
Configuration conf4 = getConf();
//whatever configuration you do for the jobs
Job job1 = new Job(conf1,"name1");
Job job2 = new Job(conf2,"name2");
Job job3 = new Job(conf3,"name3");
Job job4 = new Job(conf4,"name4");
//setup for the jobs here
job1.waitForCompletion(true);
job2.waitForCompletion(true);
job3.waitForCompletion(true);
job4.waitForCompletion(true);
まさにこれを目的としたappengine-pipelineプロジェクトが必要です。