2

次のようなバイナリ形式で記述された大規模な時系列データ ファイルを処理する M/R ジョブを作成しています (読みやすくするためにここに新しい行があり、実際のデータは明らかに連続しています)。

TIMESTAMP_1---------------------TIMESTAMP_1
TIMESTAMP_2**********TIMESTAMP_2 
TIMESTAMP_3%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%TIMESTAMP_3
.. etc

ここで、timestamp は単に 8 バイトの構造体であり、最初の 2 バイトで識別できます。実際のデータは、上に表示されているように、重複した値のタイムスタンプの間に境界があり、1 つ以上の事前定義された構造体が含まれています。キーと値のペアをマッパーに発行するカスタム InputFormat を作成したいと思います。

< TIMESTAMP_1, --------------------- >
< TIMESTAMP_2, ********** >
< TIMESTAMP_3, %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% >

論理的には、TIMESTAMP現在TIMESTAMP<TIMESTAMP, DATA>. 私の問題は、 内の分割間の同期でRecordReaderあるため、特定のリーダーが次の分割を受信した場合

# a split occurs inside my data
reader X: TIMESTAMP_1--------------
reader Y: -------TIMESTAMP_1 TIMESTAMP_2****..

# or inside the timestamp
or even: @@@@@@@TIMES
         TAMP_1-------------- ..

これにアプローチする良い方法は何ですか?CustomRecordReader分割間で同期でき、データを失わないように、ファイル オフセットにアクセスする簡単な方法はありますか? 分割の処理方法について概念的なギャップがあると思うので、これらの説明が役立つかもしれません。ありがとう。

4

2 に答える 2

3

一般に、分割をサポートする入力形式を作成するのは簡単ではありません。一貫性のあるレコードを取得するために、分割境界からどこに移動するかを見つけることができるはずだからです。XmlInputFormatは、そうするフォーマットの良い例です。
分割可能な入力が本当に必要かどうかを最初に検討することをお勧めしますか?入力フォーマットを分割不可として定義することができ、これらすべての問題が発生することはありません。
ファイルが一般的にブロックサイズよりもそれほど大きくない場合、何も失うことはありません。そうすると、データの局所性の一部が失われます。

于 2012-05-10T12:39:19.180 に答える
2

FileInputFormatたとえば、の具体的なサブクラスをサブクラス化しSeqenceFileAsBinaryInputFormat、メソッドをオーバーライドしてisSplitable()を返すことができfalseます。

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;

public class NonSplitableBinaryFile extends SequenceFileAsBinaryInputFormat{

  @Override
  protected boolean isSplitable(FileSystem fs, Path file) {
      return false;
  }

  @Override
  public RecordReader getRecordReader(InputSplit split, JobConf job,
  Reporter reporter) throws IOException {
    //return your customized record reader here
  }
}
于 2012-05-10T11:43:40.847 に答える