0

入力を読み取ってデータベースに書き込むマッパーがあります。実際に変換されてそのデータベースに書き込まれる入力の数を制限したいのですが、すべてのマッパーは制限に貢献し、その制限に達したら停止する必要があります (およそ; 1 つまたは 2 つの追加は大したことではありません)。

マッパーにリミッター機能を実装して、他のタスクに「インポートしたレコード数は?」と尋ねます。所定の制限に達すると、それらのレコードのインポートが停止されます (ただし、他の目的のために処理は続行されます)。

問題のマップ コードは次のようになります。

public void map(ImmutableBytesWritable key, Result row, Context context) {
  // prepare the input
  // ...

  if (context.getCounter(Metrics.IMPORTED).getValue()<IMPORT_LIMIT){
    importRecord();
    context.getCounter(Metrics.IMPORTED).increment(1l);
  }

  // do other things
  // ...
}

そのため、各マッパーはインポートする余地があるかどうかを確認し、制限に達していない場合にのみインポートを実行します。ただし、各マッパー自体は制限までインポートしているため、16 マッパーの場合、16*IMPORT_LIMIT レコードがインポートされます。間違いなくいくつかの制限を行っています (インポートされたレコードの通常の数よりもはるかに少ない数です)。

カウンタ値が他のマッパーにプッシュされるのはいつですか? または、各マッパーで使用できるようになるのはいつですか? カウンターから実際にリアルタイムの値を取得できますか、それともマッパーが終了したときにのみ更新されますか? マッパー間で値を共有するより良い方法はありますか?

4

1 に答える 1

0

わかりました: 私が見た限りでは、MapReduce はジョブが完了するまでマッパー間でカウンターを共有しません (つまり、まったく共有しません)。途中でコミットしたマッパーが後のマッパーにカウンターを表示できるようにするかどうかはわかりませんが、しかし、リアルタイムで行うには十分な信頼性がありません。

代わりに、単純な Java アプリケーションを実行して行を反復処理し、既存の MapReduce ジョブが行をインポートするかどうかを決定するために使用する列に書き込みます。

于 2012-06-25T15:08:08.803 に答える