2

Java UDF を使用してバッグ内のタプルをランク付けする Java UDF を作成しようとしています。タプルには、ランキングの基準となる値の列と、最初は 0 に設定されているランクの列があります。タプルは、値の列に基づいて並べ替えられます。すべてのタプルはバッグに入れられ、そのバッグは UDF に渡される新しいタプルの中に入れられます。

ただし、UDF はランク列を変更しています。メソッドが終了すると、値はすべて再び 0 になります。値を「固定」する方法がわかりません。

どんな助けでも大歓迎です。

ここに私のJavaクラスがあります

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.FilterFunc;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.DataBag;
import org.apache.pig.impl.logicalLayer.FrontendException;
import java.util.Iterator;
import org.apache.pig.PigWarning;

/**
 *
 * @author Winter
 */
public class Ranker  extends EvalFunc<String>{
    @Override
    public String exec(Tuple tuple) throws IOException {
        if (tuple == null || tuple.size() == 0) {
            return null;
        }


        List<Object> list = tuple.getAll();
        DataBag db = (DataBag) list.get(0);
        Integer num = (Integer)list.get(1);

        Iterator<Tuple>itr = db.iterator();
        boolean containsNonNull = false;
        int i = 1;
        double previous=0;
        while (itr.hasNext()) {

            Tuple t= itr.next();
            double d = (Double)t.get(num.intValue());
            int rankCol = t.size()-1;
            Integer rankVal = (Integer)t.get(rankCol);
            if(i == 0){    
                System.out.println("i==0");
                previous = d;
                t.set(rankCol, i);
            } else {
                if(d == previous)
                    t.set(rankCol, i);
                else{
                    System.out.print("d!==previous|" + d + "|"+ previous+"|"+rankVal);
                    t.set(rankCol, ++i);
                    rankVal = (Integer)t.get(rankCol);
                     System.out.println("|now rank val" + rankVal);
                    previous = d;
                }
            }
        }


        return "Y";
    }
}

これが、Pigですべてを呼び出す方法です-

REGISTER /myJar.jar;
A = LOAD '/Users/Winter/milk-tea-coffee.tsv'  as (year:chararray, milk:double);
B = foreach A generate year, milk, 0 as rank;
C = order B by milk asc; 
D = group C by rank order C by milk;
E = foreach D generate D.C.year,D.C.milk,D.C.rank,  piglet3.evalFunctions.Ranker(D.C,1);
dump E;

UDF 内の print ステートメントにより、UDF 内で動作していることがわかります。 =前|21.9|21.6|0|現在ランクval4 d!==前|22.0|21.9|0|現在ランクval5 d!==前|22.5|22.0|0|現在ランクval6 d!==前|22.9| 22.5|0|現在ランクval7d!==前回|23.0|22.9|0|現在ランクval8d!==前回|23.4|23.0|0|現在ランクval9d!==前回|23.8|23.4|0|現在ランク val10 d!==前|23.9|23.8|0|現在 ランク val11

しかし、E または D または C をダンプすると、ランク列には 0 しか含まれません。

4

1 に答える 1

1

exec 関数は、必要な出力を UDF から返す必要があります。現在、exec 関数に渡されているタプルを変更してから、文字列 "Y" を返しています。Pig が UDF からの出力として見るのは "Y" だけです。この場合、「Y」の代わりに Tuple を返す必要があります。

次のコードはあなたの意図に近いと思いますが、何をしようとしているのかよくわかりません:

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.FilterFunc;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.DataBag;
import org.apache.pig.impl.logicalLayer.FrontendException;
import java.util.Iterator;
import org.apache.pig.PigWarning;

/**
 *
 * @author Winter
 */
public class Ranker  extends EvalFunc<Tuple>{
    @Override
    public Tuple exec(Tuple tuple) throws IOException {
        if (tuple == null || tuple.size() == 0) {
            return null;
        }


        List<Object> list = tuple.getAll();
        DataBag db = (DataBag) list.get(0);
        Integer num = (Integer)list.get(1);

        Iterator<Tuple>itr = db.iterator();
        boolean containsNonNull = false;
        int i = 1;
        double previous=0;
        while (itr.hasNext()) {

            Tuple t= itr.next();
            double d = (Double)t.get(num.intValue());
            int rankCol = t.size()-1;
            Integer rankVal = (Integer)t.get(rankCol);
            if(i == 0){    
                System.out.println("i==0");
                previous = d;
                t.set(rankCol, i);
            } else {
                if(d == previous)
                    t.set(rankCol, i);
                else{
                    System.out.print("d!==previous|" + d + "|"+ previous+"|"+rankVal);
                    t.set(rankCol, ++i);
                    rankVal = (Integer)t.get(rankCol);
                     System.out.println("|now rank val" + rankVal);
                    previous = d;
                }
            }
        }


        return tuple;
    }
}
于 2012-04-05T06:59:52.960 に答える