Guyが述べたように入力を前処理するか、ここで説明されている他のトリックを適用できます。
最もクリーンな解決策は、1 つのレコード/START-END を作成するカスタムInputFormat (その RecordReader と共に) を実装することだと思います。Pig のLoadFuncは Hadoop の InputFormat の上にあるため、LoadFunc が使用する InputFormat を定義できます。
カスタム LoadFunc の未加工のスケルトン実装は次のようになります。
import java.io.IOException;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
public class CustomLoader extends LoadFunc {
private RecordReader reader;
private TupleFactory tupleFactory;
public CustomLoader() {
tupleFactory = TupleFactory.getInstance();
}
@Override
public InputFormat getInputFormat() throws IOException {
return new MyInputFormat(); //custom InputFormat
}
@Override
public Tuple getNext() {
Tuple result = null;
try {
if (!reader.nextKeyValue()) {
return null;
}
//value can be a custom Writable containing your name/value
//field pairs for a given record
Object value = reader.getCurrentValue();
result = tupleFactory.newTuple();
// ...
//append fields to tuple
}
catch (Exception e) {
// ...
}
return result;
}
@Override
public void prepareToRead(RecordReader reader, PigSplit pigSplit)
throws IOException {
this.reader = reader;
}
@Override
public void setLocation(String location, Job job) throws IOException {
FileInputFormat.setInputPaths(job, location);
}
}
はとそのをLoadFunc
初期化した後、データの入力場所を特定し、recordReader からレコードの取得を開始し、入力が完全に読み取られるまで、結果のタプル ( getNext() ) を作成します。InputFormat
RecordReader
カスタム InputFormat に関する注意事項:
RecordReader が の修正版であるカスタム InputFormat を作成します
org.apache.hadoop.mapreduce.lib.input.LineRecordReader
。 ほとんどのメソッドは同じままですが、例外はinitialize()
: カスタム LineReader (に基づくorg.apache.hadoop.util.LineReader
) を呼び出すことです。InputFormat のキーは行オフセット (Long) になり、値はカスタムの Writableになります。これは、キーと値のペアのリストとして、レコードのフィールド (つまり、START-END の間のデータ) を保持します。RecordReadernextKeyValue()
が呼び出されるたびに、レコードは LineReader によってカスタム Writable に書き込まれます。全体の要点は、実装方法です LineReader.readLine()
。
別のおそらくより簡単な方法は、TextInputFormat の区切り文字 (Hadoop 0.23 で構成可能です。 を参照textinputformat.record.delimiter
) をデータ構造に適したものに変更することです (可能な場合)。この場合、Text
KV ペアを分割して抽出し、タプルにする必要があるデータを取得することになります。