2

appengine-mapreduceライブラリの単純なcontrol.start_map()関数を使用して、mapreduce ジョブを開始しています。このジョブは正常に完了し、結果のページに最大 4,300 万回のマッパー呼び出しが表示されます。ただし、このページでは、reduce ステップや、まだ実行されていると思われる基になる appengine-pipeline プロセスについては言及していません。この呼び出しが作成するパイプライン ID を返す方法はありますか?これにより、基になるパイプラインを調べて、この長時間実行されるジョブのデバッグに役立てることができますか? このページを表示するのに十分な情報を取得したいと思います: /mapreduce/detail?mapreduce_id=<my_id>/mapreduce/pipeline/status?root=<guid>

これは、最初に mapreduce ジョブを開始するために使用しているコードの例です。

from third_party.mapreduce import control
mapreduce_id = control.start_map(
    name="Backfill",
    handler_spec="mark_tos_accepted",
    reader_spec=(
        "third_party.mapreduce.input_readers.DatastoreInputReader"),
    mapper_parameters={
        "input_reader": {
            "entity_kind": "ModelX"
        },
    },
    shard_count=64,
    queue_name="backfill-mapreduce-queue",
 )

マッピング関数は次のとおりです。

# This is where we keep our copy of appengine-mapreduce
from third_party.mapreduce import operation as op

def mark_tos_accepted(modelx):
    # Skip users who have already been marked
    if (not modelx
        or modelx.tos_accepted == myglobals.LAST_MATERIAL_CHANGE_TO_TOS):
    return

    modelx.tos_accepted = user_models.LAST_MATERIAL_CHANGE_TO_TOS
    yield op.db.Put(modelx)

ModelX の関連部分は次のとおりです。

class BackupModel(db.Model):
    backup_timestamp = db.DateTimeProperty(indexed=True, auto_now=True)

class ModelX(BackupModel):
    tos_accepted = db.IntegerProperty(indexed=False, default=0)

詳細については、データ ウェアハウスに表示される書き込みで発生している問題をデバッグしようとしています。

A2013 年 3 月 23 日、約 4,300 万のエンティティを含む db.Model ( と呼びましょう) に対してMapReduce ジョブ ( と呼びましょう) を開始しましたModelX。7 時間後、ジョブは「終了」し、/mapreduce/detail以下に示すように、すべてのエンティティが正常にマッピングされたことがページに表示されました。

mapper-calls: 43613334 (1747.47/sec avg.)

2013 年 3 月 31 日、別の MapReduce ジョブ (B としましょう) をModelX. 12 時間後、ジョブは成功ステータスで終了し、 /mapreduce/detail以下に示すように、すべてのエンティティが正常にマッピングされたことがページに表示されました。

mapper-calls: 43803632 (964.24/sec avg.)

以前はどのエンティティにも含まれていなかった新しいプロパティを導入したため、MR ジョブ A がすべてのModelXエンティティに書き込みを行ったことはわかっています。そのModelXような auto_add プロパティが含まれています。

backup_timestamp = ndb.DateTimeProperty(indexed=True, auto_now=True)

当社のデータ ウェアハウジング プロセスではModelX、特定の日に変更されたエンティティを見つけるためにクエリを実行し、それらのエンティティをダウンロードして別の (AWS) データベースに保存し、分析を実行できるようにします。このクエリの例は次のとおりです。

db.GqlQuery('select * from ModelX where backup_timestamp >= DATETIME(2013, 4, 10, 0, 0, 0) and backup_timestamp < DATETIME(2013, 4, 11, 0, 0, 0) order by backup_timestamp')

私たちのデータ ウェアハウスには、MR ジョブが完了した各日に約 4,300 万のエンティティがあると予想されますが、実際には約 3,000,000 に近く、次の進行状況に示されているように、その後の各日は増加を示しています。

3/16/13 230751
3/17/13 193316
3/18/13 344114
3/19/13 437790
3/20/13 443850
3/21/13 640560
3/22/13 612143
3/23/13 547817
3/24/13 2317784  // Why isn't this ~43M ?
3/25/13 3701792  // Why didn't this go down to ~500K again?
3/26/13 4166678
3/27/13 3513732
3/28/13 3652571

これは、mapreduce ジョブによって発行された op.db.Put() 呼び出しがまだ何らかのパイプラインまたはキューで実行されており、このトリクル効果を引き起こしていると思われます。

さらに、古い を使用してエンティティをクエリすると、backup_timestampかなり前に戻っても多くのエンティティを取得できますが、これらのクエリはすべて 0 を返すと予想されます。

In [4]: ModelX.all().filter('backup_timestamp <', 'DATETIME(2013,2,23,1,1,1)').count()
Out[4]: 1000L

In [5]: ModelX.all().filter('backup_timestamp <', 'DATETIME(2013,1,23,1,1,1)').count()
Out[5]: 1000L

In [6]: ModelX.all().filter('backup_timestamp <', 'DATETIME(2012,1,23,1,1,1)').count()
Out[6]: 1000L

ただし、クエリが返すべきではないエンティティを返すという奇妙な動作があります。

In [8]: old = ModelX.all().filter('backup_timestamp <', 'DATETIME(2012,1,1,1,1,1)')

In [9]: paste
for o in old[1:100]:
  print o.backup_timestamp
## -- End pasted text --
2013-03-22 22:56:03.877840
2013-03-22 22:56:18.149020
2013-03-22 22:56:19.288400
2013-03-22 22:56:31.412290
2013-03-22 22:58:37.710790
2013-03-22 22:59:14.144200
2013-03-22 22:59:41.396550
2013-03-22 22:59:46.482890
2013-03-22 22:59:46.703210
2013-03-22 22:59:57.525220
2013-03-22 23:00:03.864200
2013-03-22 23:00:18.040840
2013-03-22 23:00:39.636020

これは、インデックスの更新に時間がかかっているだけだと思います。

私たちのデータ ウェアハウスがダウンロードするエンティティの数もグラフにしましたが、崖のような落ち込みが見られることに気付きました。 appengine ダッシュボード。たとえば、このグラフは、mapreduce ジョブを開始した 3 月 23 日にかなり大きなスパイクがあったことを示していますが、その後すぐに劇的な低下を示しています。

このグラフは、BackupTimestamp GqlQuery各日の 10 分間隔ごとに返されるエンティティの数を示しています。紫色の線は、MapReduce ジョブがスピンアップするにつれて大きなスパイクを示し、その後スロットリングが開始されると約 1 時間後に劇的な低下を示していることに注意してください。このグラフは、時間ベースのスロットリングが行われているように見えることも示しています。

1 日あたり 10 分ごとのデータストア読み取り

4

3 に答える 3

2

マッパーを起動しただけなので、リデューサー関数はないと思います。完全な mapreduce を行うには、明示的に a をインスタンス化し、MapReducePipelineそれを呼び出す必要がありstartます。おまけとして、これはステータス URL で使用できるパイプライン ID を返すため、質問に答えます。

于 2013-05-10T21:23:57.993 に答える