Pig (0.10) に hdfs から hbase にデータをロードさせています。生のレコードには一意の行キーがないため、UDF を構成します。
public class Foo extends EvalFunc<Tuple> {
// FIXME: If there are multiple map jobs for the same batch,
// they will reuse the serial numbers.
// Need to add something to figure out a distinct per task #
private int task_id=0;
private long serial=0L;
public Tuple exec(Tuple input) throws IOException {
if (input == null || input.size() == 0)
return null;
try {
Integer batch_id=(Integer)input.get(0);
String rowkey=String.format("%7d%3d%9d", batch_id, task_id, serial++);
// ... compute other values for the return Tuple.
}
}
}
私の理解では、pig が同じ入力データ セットに対して 2 つの異なるマップ ジョブを開始した場合 (チャンクサイズを超えたか、ディレクトリからロードするときに複数の入力ファイルがあるため)、それぞれが個別の Java インスタンスになり、したがってFoo.serial の複数の独立したコピーが存在します。行キーは一意ではなく、HBase にロードしようとしている多くのレコードを上書きします。
UDF がどのマッパーの一部であるかを判別できれば、衝突はなくなります。IP アドレス + プロセス ID にフォールバックすることもできますが、それはかなり無駄です。