2

各タプルが複数の行にまたがるファイルがあります。次に例を示します。

START
name: Jim
phone: 2128789283
address: 56 2nd street, New York, USA
END
START
name: Tom
phone: 6308789283
address: 56 5th street, Chicago, 13611, USA
END
.
.
.

上記は私のファイルの2つのタプルです。getNext()START かどうかをチェックする関数を定義した UDF を作成してから、タプルを初期化します。END の場合は、(文字列バッファーから) タプルを返します。それ以外の場合は、文字列を文字列バッファーに追加します。

ファイルサイズが 64 MB (Amazon EMR の場合) である HDFS ブロックサイズよりも小さい場合はうまく機能しますが、これよりも大きいサイズでは失敗します。私はググってみましたが、このブログ投稿を見つけてください。Raja の説明は理解しやすく、サンプル コードを提供してくれました。しかし、コードはfor pigRecordReaderの代わりにその部分を実装しています。複数行の豚のタプル分割問題を処理した経験がある人がいるかどうか疑問に思っていますか? Pigで実装する必要がありますか?もしそうなら、どのように?getNext()LoadFuncRecordReader

ありがとう。

4

2 に答える 2

1

Guyが述べたように入力を前処理するか、ここで説明されている他のトリックを適用できます。

最もクリーンな解決策は、1 つのレコード/START-END を作成するカスタムInputFormat (その RecordReader と共に) を実装することだと思います。Pig のLoadFuncは Hadoop の InputFormat の上にあるため、LoadFunc が使用する InputFormat を定義できます。
カスタム LoadFunc の未加工のスケルトン実装は次のようになります。

import java.io.IOException;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

public class CustomLoader extends LoadFunc {

    private RecordReader reader;
    private TupleFactory tupleFactory;

    public CustomLoader() {
        tupleFactory = TupleFactory.getInstance();
    }

    @Override
    public InputFormat getInputFormat() throws IOException {
        return new MyInputFormat(); //custom InputFormat
    }

    @Override
    public Tuple getNext() {
        Tuple result = null;
        try {
            if (!reader.nextKeyValue()) {
                return null;
            }
            //value can be a custom Writable containing your name/value 
            //field pairs for a given record
            Object value = reader.getCurrentValue();
            result = tupleFactory.newTuple();
            // ...
            //append fields to tuple
        }
        catch (Exception e) {
            // ...
        }
        return result;
    }

    @Override
    public void prepareToRead(RecordReader reader, PigSplit pigSplit) 
      throws IOException {
        this.reader = reader;
    }

    @Override
    public void setLocation(String location, Job job) throws IOException {
        FileInputFormat.setInputPaths(job, location);
    }
}

はとそのをLoadFunc初期化した後、データの入力場所を特定し、recordReader からレコードの取得を開始し、入力が完全に読み取られるまで、結果のタプル ( getNext() ) を作成します。InputFormatRecordReader

カスタム InputFormat に関する注意事項:

RecordReader が の修正版であるカスタム InputFormat を作成します org.apache.hadoop.mapreduce.lib.input.LineRecordReader。 ほとんどのメソッドは同じままですが、例外はinitialize(): カスタム LineReader (に基づくorg.apache.hadoop.util.LineReader) を呼び出すことです。InputFormat のキーは行オフセット (Long) になり、値はカスタムの Writableになります。これは、キーと値のペアのリストとして、レコードのフィールド (つまり、START-END の間のデータ) を保持します。RecordReadernextKeyValue()が呼び出されるたびに、レコードは LineReader によってカスタム Writable に書き込まれます。全体の要点は、実装方法です LineReader.readLine()

別のおそらくより簡単な方法は、TextInputFormat の区切り文字 (Hadoop 0.23 で構成可能です。 を参照textinputformat.record.delimiter) をデータ構造に適したものに変更することです (可能な場合)。この場合、TextKV ペアを分割して抽出し、タプルにする必要があるデータを取得することになります。

于 2012-12-14T13:54:50.997 に答える