2

App Engine Mapreduce API は、最終的な削減ジョブの独自のロジックに従ってコンピューティング シャード サイズを決定しますか?

私は App Engine mapreduce API を使用しており、kwarg を提供してshard_size mapreduce シャード サイズを設定しています。

reduce 関数の最終ステップを実行する際に、あまり多くの結果を 1 つのバッチにまとめたくないため、シャード サイズは mapreduce ジョブで特に重要です。つまり、システムの外部制約に従ってユーザーを均等に分割するために、シャード サイズをハードコーディングしています。

マップ ジョブは問題なくシャードアウトしているように見えますが、リデューサーは指定したシャードの一部しか使用しません。

私が扱っている種類のコードの大まかな概要は次のとおりです。

SHARD_SIZE = 42

def map_fun(entity):
  shard_key = random.randint(1, SHARD_SIZE)
  yield (
    shard_key,
    db.model_to_protobuf(entity).SerializeToString().encode('base64')
  )

def reduce_fun(key, entities):
  batch = []
  for entity in entities:
    #check for stuff
    batch.append(entity)
  expensive_side_effect(batch)


class MyGreatPipeline(base_handler.PipelineBase):
  def run(self, *args, **kw):
    yield mapreduce_pipeline.MapreducePipeline(
      'label'
      'path.to.map_fun',
      'path.to.reduce_fun',
      'mapreduce.input_readers.DatastoreInputReader',
      'mapreduce.output_writers.BlobstoreOutputWriter',
      mapper_params={
        'entity_kind': 'path.to.entity',
        'queue_name': 'coolQueue'
      },
      reducer_params={},
      shard_size = SHARD_SIZE
    )

map_fun具体的には、シャード サイズに応じてランダムに決定されるシャードを各エンティティに割り当てます。SHARD_SIZE多くのエンティティがあり、同じ整数が繰り返し選択される可能性が非常に低いのに、レデューサーのシャードが少ない理由について混乱しています。

4

1 に答える 1

0

あなたがここで何をしているのか、私は困惑しています。map フェーズを使用して小さなシャード キーにグループ化し、後でそれらのキーを短時間で処理するのは奇妙に見えます。マッパー ワーカーと同じ数のリデュース ワーカーを使用したとしても、キーごとに行う作業が多すぎることになります。

処理されている「バッチ」はランダムに任意であるためexpensive_side_effect()、バッチの内容に依存しないと思います。代わりにマップ時にその作業を行い、reduced が出力ライターに渡すことができる何かを発行しないのはなぜですか?

于 2012-06-22T04:35:25.133 に答える