0

Pig (0.10) に hdfs から hbase にデータをロードさせています。生のレコードには一意の行キーがないため、UDF を構成します。

public class Foo extends EvalFunc<Tuple> {
    // FIXME: If there are multiple map jobs for the same batch,
    // they will reuse the serial numbers.
    // Need to add something to figure out a distinct per task #
    private int task_id=0;
    private long serial=0L;

    public Tuple exec(Tuple input) throws IOException {
        if (input == null || input.size() == 0)
            return null;
        try {
            Integer batch_id=(Integer)input.get(0);
            String rowkey=String.format("%7d%3d%9d", batch_id, task_id, serial++);
            // ... compute other values for the return Tuple.
        }
    }
}

私の理解では、pig が同じ入力データ セットに対して 2 つの異なるマップ ジョブを開始した場合 (チャンクサイズを超えたか、ディレクトリからロードするときに複数の入力ファイルがあるため)、それぞれが個別の Java インスタンスになり、したがってFoo.serial の複数の独立したコピーが存在します。行キーは一意ではなく、HBase にロードしようとしている多くのレコードを上書きします。

UDF がどのマッパーの一部であるかを判別できれば、衝突はなくなります。IP アドレス + プロセス ID にフォールバックすることもできますが、それはかなり無駄です。

4

1 に答える 1

0

DataFuコレクションの列挙型 UDF を見てください。これはバッグを取り、各要素に 1 から N までの番号を割り当てます。ここで、N はバッグのサイズです。これの残念な副作用は、すべてのデータが 1 つのレデューサーを通過する必要があることです。しかし、あなたの説明からすると、これは大きな問題ではないように思えます。(データは、複数のマッパーに分割する必要があるほど大きい場合があるようです。)

を使用してすべてのデータを 1 つのバッグにグループ化し、GROUP ... ALLこのバッグを列挙するだけです。次に、この番号を使用して、バッグ内の各レコードに固有のカスタム行キーを作成できます。

于 2013-04-01T21:11:16.510 に答える