`public class GenericUdafMemberLevel implements GenericUDAFResolver2 { private static final Log LOG = LogFactory .getLog(GenericUdafMemberLevel.class.getName());
@Override
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo paramInfo)
throws SemanticException {
return new GenericUdafMeberLevelEvaluator();
}
@Override
//参数校验
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
throws SemanticException {
if (parameters.length != 2) {//参数大小
throw new UDFArgumentTypeException(parameters.length - 1,
"Exactly two arguments are expected.");
}
//参数必须是原型,即不能是
if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(0,
"Only primitive type arguments are accepted but "
+ parameters[0].getTypeName() + " is passed.");
}
if (parameters[1].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(1,
"Only primitive type arguments are accepted but "
+ parameters[1].getTypeName() + " is passed.");
}
return new GenericUdafMeberLevelEvaluator();
}
public static class GenericUdafMeberLevelEvaluator extends GenericUDAFEvaluator {
private PrimitiveObjectInspector inputOI;
private PrimitiveObjectInspector inputOI2;
private DoubleWritable result;
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters)
throws HiveException {
super.init(m, parameters);
if (m == Mode.PARTIAL1 || m == Mode.COMPLETE){
inputOI = (PrimitiveObjectInspector) parameters[0];
inputOI2 = (PrimitiveObjectInspector) parameters[1];
result = new DoubleWritable(0);
}
return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
}
/** class for storing count value. */
static class SumAgg implements AggregationBuffer {
boolean empty;
double value;
}
@Override
//创建新的聚合计算的需要的内存,用来存储mapper,combiner,reducer运算过程中的相加总和。
//使用buffer对象前,先进行内存的清空——reset
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
SumAgg buffer = new SumAgg();
reset(buffer);
return buffer;
}
@Override
//重置为0
//mapreduce支持mapper和reducer的重用,所以为了兼容,也需要做内存的重用。
public void reset(AggregationBuffer agg) throws HiveException {
((SumAgg) agg).value = 0.0;
((SumAgg) agg).empty = true;
}
private boolean warned = false;
//迭代
//map阶段调用,只要把保存当前和的对象agg,再加上输入的参数,就可以了。
@Override
public void iterate(AggregationBuffer agg, Object[] parameters)
throws HiveException {
// parameters == null means the input table/split is empty
if (parameters == null) {
return;
}
try {
double flag = PrimitiveObjectInspectorUtils.getDouble(parameters[1], inputOI2);
if(flag > 1.0) //参数条件
merge(agg, parameters[0]); //这里将Map之后的操作,放入combiner进行合并
} catch (NumberFormatException e) {
if (!warned) {
warned = true;
LOG.warn(getClass().getSimpleName() + " "
+ StringUtils.stringifyException(e));
}
}
}
@Override
//combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。
public void merge(AggregationBuffer agg, Object partial)
throws HiveException {
if (partial != null) {
//通过ObejctInspector取每一个字段的数据
double p = PrimitiveObjectInspectorUtils.getDouble(partial, inputOI);
((SumAgg) agg).value += p;
}
}
@Override
//reducer返回结果,或者是只有mapper,没有reducer时,在mapper端返回结果。
public Object terminatePartial(AggregationBuffer agg)
throws HiveException {
return terminate(agg);
}
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
result.set(((SumAgg) agg).value);
return result;
}
}
}`
理論を理解するために、中国語を使用してコードにコメントを付けました。実際、UDAF の考え方は次のようなものです。 select test_sum(col1,col2) from tbl ; col2 が何らかの条件を満たす場合、col1 の値を合計します。ほとんどのコードは、公式の avg() udaf 関数からコピーされています。
私はちょっとした例外に遭遇しました:
java.lang.RuntimeException: Hive Runtime Error while closing operators
at org.apache.hadoop.hive.ql.exec.ExecMapper.close(ExecMapper.java:226)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1136)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.DoubleWritable cannot be cast to org.apache.hadoop.io.LongWritable
at org.apache.hadoop.hive.ql.exec.GroupByOperator.closeOp(GroupByOperator.java:1132)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:558)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.ExecMapper.close(ExecMapper.java:193)
... 8 more
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.DoubleWritable cannot be cast to org.apache.hadoop.io.LongWritable
at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector.get(WritableLongObjectInspector.java:35)
at org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:323)
at org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serializeStruct(LazyBinarySerDe.java:255)
at org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:202)
at org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.processOp(ReduceSinkOperator.java:236)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:474)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:800)
at org.apache.hadoop.hive.ql.exec.GroupByOperator.forward(GroupByOperator.java:1061)
at org.apache.hadoop.hive.ql.exec.GroupByOperator.closeOp(GroupByOperator.java:1113)
... 13 more
UDAF に何か問題がありますか?? よろしければご指摘ください。ありがとうございます。