実際には、PIG または HIVE の行に行番号を追加するにはどうすればよいですか?という質問に関連しています。
srini が提供する 3 番目の回答は正常に機能しますが、udf の後にデータにアクセスするのに問題があります。
srini が提供する udf は次のとおりです。
import java.io.IOException;
import java.util.Iterator;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.data.DataType;
public class RowCounter extends EvalFunc<DataBag> {
TupleFactory mTupleFactory = TupleFactory.getInstance();
BagFactory mBagFactory = BagFactory.getInstance();
public DataBag exec(Tuple input) throws IOException {
try {
DataBag output = mBagFactory.newDefaultBag();
DataBag bg = (DataBag)input.get(0);
Iterator it = bg.iterator();
Integer count = new Integer(1);
while(it.hasNext())
{ Tuple t = (Tuple)it.next();
t.append(count);
output.add(t);
count = count + 1;
}
return output;
} catch (ExecException ee) {
// error handling goes here
throw ee;
}
}
public Schema outputSchema(Schema input) {
try{
Schema bagSchema = new Schema();
bagSchema.add(new Schema.FieldSchema("RowCounter", DataType.BAG));
return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
bagSchema, DataType.BAG));
}catch (Exception e){
return null;
}
}
}
次のように簡単なテスト豚スクリプトを書きました
A = load 'input.txt' using PigStorage(' ') as (name:chararray, age:int);
/*
--A: {name: chararray,age: int}
(amy,56)
(bob,1)
(bob,9)
(amy,34)
(bob,20)
(amy,78)
*/
B = group A by name;
C = foreach B {
orderedGroup = order A by age;
generate myudfs.RowCounter(orderedGroup) as t;
}
/*
--C: {t: {(RowCounter: {})}}
({(amy,34,1),(amy,56,2),(amy,78,3)})
({(bob,1,1),(bob,9,2),(bob,20,3)})
*/
D = foreach C generate FLATTEN(t);
/*
D: {t::RowCounter: {}}
(amy,34,1)
(amy,56,2)
(amy,78,3)
(bob,1,1)
(bob,9,2)
(bob,20,3)
*/
問題は、後の操作で D をどのように使用するかです。複数の方法を試しましたが、常に次のエラーが発生しました
ava.lang.ClassCastException: java.lang.String cannot be cast to org.apache.pig.data.DataBag
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:575)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:248)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:316)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:332)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:284)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.runPipeline(PigGenericMapReduce.java:459)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.processOnePackageOutput(PigGenericMapReduce.java:427)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:407)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:261)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:572)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:414)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:256)
私の推測では、バッグ内にタプルのスキーマがないためです。これが理由である場合、どのように udf を変更すればよいですか?