123

によるHadoop - The Definitive Guide

FileInputFormats が定義する論理レコードは、通常、HDFS ブロックにうまく収まりません。たとえば、TextInputFormat の論理レコードは行であり、HDFS の境界を頻繁に越えます。これは、プログラムの機能には関係ありません。たとえば、行が欠落したり壊れたりすることはありませんが、知っておく価値はあります。入力データ) は、いくつかのリモート読み取りを実行します。これにより発生するわずかなオーバーヘッドは、通常は重要ではありません。

レコード行が 2 つのブロック (b1 と b2) に分割されているとします。最初のブロック (b1) を処理するマッパーは、最後の行に EOL セパレータがないことに気付き、次のデータ ブロック (b2) から残りの行をフェッチします。

2 番目のブロック (b2) を処理するマッパーは、最初のレコードが不完全であり、ブロック (b2) の 2 番目のレコードから処理を開始する必要があることをどのように判断しますか?

4

6 に答える 6

163

興味深い質問です。詳細についてコードを調べるのに時間を費やしました。ここに私の考えがあります。分割は によってクライアントによって処理されるInputFormat.getSplitsため、FileInputFormat を見ると次の情報が得られます。

  • 入力ファイルごとに、ファイルの長さ、ブロック サイズを取得し、分割サイズを計算max(minSize, min(maxSize, blockSize))します。maxSizemapred.max.split.sizeminSizemapred.min.split.size
  • FileSplit上で計算した分割サイズに基づいて、ファイルを異なる に分割します。ここで重要なのは、それぞれが入力ファイルのオフセットに対応するパラメーターFileSplitで初期化さstartれることです。その時点ではまだ回線の処理はありません。コードの関連部分は次のようになります。

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
      splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                               blkLocations[blkIndex].getHosts()));
      bytesRemaining -= splitSize;
    }
    

その後、LineRecordReaderによって定義されているを見ると、TextInputFormatそこで行が処理されます。

  • あなたがあなたを初期化するとき、行を読むことができる抽象LineRecordReader化である a をインスタンス化しようとします。2 つのケースがあります。LineReaderFSDataInputStream
  • CompressionCodec定義されている場合、このコーデックは境界の処理を担当します。おそらくあなたの質問には関係ありません。
  • ただし、コーデックがない場合は、興味深い点です。 のstartInputSplit0 以外の場合、 1 文字バックトラックしてから、遭遇した最初の行をスキップし、\n または \r\n (Windows) で識別されます! 行の境界が分割の境界と同じ場合、有効な行をスキップしないようにするため、バックトラックは重要です。関連するコードは次のとおりです。

    if (codec != null) {
       in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
    } else {
       if (start != 0) {
         skipFirstLine = true;
         --start;
         fileIn.seek(start);
       }
       in = new LineReader(fileIn, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
      start += in.readLine(new Text(), 0,
                        (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
    

したがって、分割はクライアントで計算されるため、マッパーを順番に実行する必要はありません。すべてのマッパーは、最初の行を破棄する必要があるかどうかを既に認識しています。

したがって、基本的に、同じファイルに 100Mb ごとに 2 行ある場合、簡単にするために、分割サイズが 64Mb であるとします。次に、入力分割が計算されると、次のシナリオになります。

  • このブロックへのパスとホストを含​​む分割 1。開始時に初期化 200-200=0Mb、長さ 64Mb。
  • 開始時に初期化されたスプリット 2 200-200+64=64Mb、長さ 64Mb。
  • 開始時に初期化されたスプリット 3 200-200+128=128Mb、長さ 64Mb。
  • 開始時に初期化されたスプリット 4 200-200+192=192Mb、長さ 8Mb。
  • マッパー A は分割 1 を処理し、開始は 0 であるため最初の行をスキップせず、64Mb の制限を超えているためリモート読み取りが必要な行全体を読み取ります。
  • マッパー B は分割 2 を処理します。開始は != 0 なので、64Mb-1 バイトの後の最初の行をスキップします。これは、まだ分割 2 にある 100Mb の行 1 の終わりに対応します。分割 2 には 28Mb の行があるため、リモートは残りの 72Mb を読み取ります。
  • Mapper C は分割 3 を処理します。start は != 0 なので、128Mb-1 バイトの後の最初の行をスキップします。これは、ファイルの終わりである 200Mb の行 2 の終わりに対応するため、何もしないでください。
  • マッパー D は、192Mb-1 バイトの後に改行を探す点を除いて、マッパー C と同じです。
于 2013-01-26T18:44:05.087 に答える
18

Map Reduceアルゴリズムは、ファイルの物理ブロックでは機能しません。論理入力分割で機能します。入力分割は、レコードが書き込まれた場所によって異なります。レコードは 2 つのマッパーにまたがる場合があります。

HDFSのセットアップ方法では、非常に大きなファイルを大きなブロック (たとえば、128 MB の測定) に分割し、これらのブロックの 3 つのコピーをクラスター内の異なるノードに格納します。

HDFS は、これらのファイルの内容を認識しません。レコードはBlock-aで開始されている可能性がありますが、そのレコードの終わりはBlock-bに存在する場合があります。

この問題を解決するために、Hadoop は、入力分割と呼ばれる、ファイル ブロックに格納されたデータの論理表現を使用します。MapReduce ジョブ クライアントは、入力分割を計算するときに、ブロック内の最初のレコード全体が開始する場所と、ブロック内の最後のレコードが終了する場所を特定します

キーポイント:

ブロック内の最後のレコードが不完全な場合、入力分割には、次のブロックの位置情報と、レコードを完了するために必要なデータのバイト オフセットが含まれます。

下の図を見てください。

ここに画像の説明を入力

この記事と関連する SE の質問をご覧ください: Hadoop/HDFS ファイル分割について

詳細はドキュメントから読むことができます

Map-Reduce フレームワークは、ジョブの InputFormat に依存して次のことを行います。

  1. ジョブの入力仕様を検証します。
  2. 入力ファイルを論理的な InputSplit に分割し、それぞれを個別の Mapper に割り当てます。
  3. 各 InputSplit は、処理のために個々の Mapper に割り当てられます。Split は tuple である可能性がありますInputSplit[] getSplits(JobConf job,int numSplits) は、これらを処理するための API です。

InputFormat実装されたgetSplits() メソッドを拡張するFileInputFormat 。grepcodeでこのメソッドの内部を見てください。

于 2016-01-12T06:39:46.950 に答える
7

私はそれを次のように見ています: InputFormat は、データの性質を考慮して、データを論理的な分割に分割する責任があります。
これを妨げるものは何もありませんが、ジョブにかなりの待ち時間が追加される可能性があります。必要な分割サイズの境界に関するすべてのロジックと読み取りは、ジョブトラッカーで行われます。
最も単純なレコード対応の入力形式は TextInputFormat です。次のように機能しています(コードから理解している限り)-入力形式は、行に関係なくサイズごとに分割を作成しますが、LineRecordReaderは常に:
a)そうでない場合、分割の最初の行(またはその一部)をスキップします最初の分割
b) 最後に分割の境界の後の 1 行を読み取ります (データが使用可能な場合、最後の分割ではありません)。

于 2013-01-12T09:49:55.030 に答える
3

私が理解したことからFileSplit、最初のブロックに対して が初期化されると、デフォルトのコンストラクターが呼び出されます。したがって、start と length の値は最初はゼロです。最初のブロックの処理が終了するまでに、最後の行が不完全な場合、長さの値は分割の長さよりも大きくなり、次のブロックの最初の行も読み取られます。このため、最初のブロックの start の値は 0 より大きくなり、この状態では、LineRecordReaderは 2 番目のブロックの最初の行をスキップします。(ソースを参照)

最初のブロックの最後の行が完了すると、長さの値は最初のブロックの長さと等しくなり、2 番目のブロックの開始の値はゼロになります。その場合、LineRecordReaderは最初の行をスキップせず、最初から 2 番目のブロックを読み取ります。

理にかなっていますか?

于 2013-01-23T13:29:48.297 に答える
1

コンストラクターの LineRecordReader.java の Hadoop ソース コードから: いくつかのコメントが見つかりました:

// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
  start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;

このことから、hadoop は分割ごとに余分な行を 1 つ読み取り (現在の分割の終わりに、次の分割で次の行を読み取る)、最初の分割でない場合は最初の行が破棄されると考えています。行レコードが失われて不完全にならないように

于 2015-01-27T16:18:47.033 に答える
0

マッパーは通信する必要はありません。ファイル ブロックは HDFS にあり、現在のマッパー (RecordReader) は行の残りの部分を含むブロックを読み取ることができます。これは舞台裏で行われます。

于 2014-04-06T00:01:42.063 に答える