String フィールドである列があります。この文字列を読み取り、バッグに保存し、キーも与える必要があります (したがって、JSON として保存すると、一意の.
私のデータファイルの例は次のとおりです: "test, kyle"
出力を次のようにしたい: {"test":[{"key": "test"}, {"value":"kyle"}]}
public class BagTupleExampleUDF extends EvalFunc<DataBag> {
TupleFactory mTupleFactory = TupleFactory.getInstance();
BagFactory mBagFactory = BagFactory.getInstance();
int counter = 0;
static String fieldNames[] = {"key", "value"};
@Override
public DataBag exec(Tuple tuple) throws IOException {
// expect one string
if (tuple == null || tuple.size() == 0) {
throw new IllegalArgumentException("BagTupleExampleUDF: requires input parameters.");
}
try {
String key = (String) tuple.get(0);
String input = (String) tuple.get(1);
DataBag output = mBagFactory.newDefaultBag();
output.add(mTupleFactory.newTuple(Collections.singletonList(key)));
output.add(mTupleFactory.newTuple(Collections.singletonList(input)));
return output;
}
catch (Exception e) {
throw new IOException("BagTupleExampleUDF: caught exception processing input.", e);
}
}
public Schema outputSchema(Schema input) {
// Function returns a bag with this schema: { (Double), (Double) }
// Thus the outputSchema type should be a Bag containing a Double
try{
Schema bagSchema = new Schema();
String schemaName = getSchemaName(this.getClass().getName().toLowerCase(), input);
bagSchema.add(new Schema.FieldSchema(fieldNames[counter], DataType.CHARARRAY));
counter++;
return new Schema(new Schema.FieldSchema(schemaName, bagSchema, DataType.BAG));
}
catch (Exception e){
throw new RuntimeException(e);
}
}
私が抱えている問題は、次の行です。
bagSchema.add(new Schema.FieldSchema(fieldNames[counter], DataType.CHARARRAY));
基本的に、タプルから読み取った各値には異なる識別キーが必要です。これにより、これが Pig で終了したときに、追加したこれらの新しい列を参照できます。
私はまだ Pig、特に UDFS に慣れていないので、さらに情報が必要な場合はお知らせください。