1

2 つのコレクション間に多対多のマッピング テーブルがあります。マッピング テーブルの各行は、重みスコアで可能なマッピングを表します。

mapping(id1, id2, weight)

クエリ: id1 と id2 の間の 1 対 1 のマッピングを生成します。最小の重みを使用して、重複するマッピングを削除します。同点の場合は任意のものを出力する。

入力例:

(1, X, 1)
(1, Y, 2)
(2, X, 3)
(2, Y, 1)
(3, Z, 2)

出力

(1, X)
(2, Y)
(3, Z)

1 と 2 は両方とも X と Y にマッピングされます。マッピング (1, X) と (2, Y) を選択します。

4

2 に答える 2

2

重みがid1を含むマッピングの中で最も低く、id2を含むマッピングの中で最も低いマッピングにのみ関心があると想定します。たとえば、マッピング(2、Y、4)が追加されている場合、(1、X、1)と競合することはありません。重みが失格となった(1、Y、2)および(2、X、3)よりも小さいため、このようなマッピングは除外します。

私の解決策は次のように進行します。各id1の最小マッピング重みを見つけ、それを将来の参照用にマッピング関係に結合します。ネストされたforeachを使用して各id2を調べます。ORDERとLIMITを使用して、そのid2の重みが最小のレコードを選択し、重みがそのid1の最小値でもある場合にのみ保持します。

入力でテストされた完全なスクリプトは次のとおりです。

mapping = LOAD 'input' AS (id1:chararray, id2:chararray, weight:double);

id1_weights =
    FOREACH (GROUP mapping BY id1)
    GENERATE group AS id1, MIN(mapping.weight) AS id1_min_weight;
mapping_with_id1_mins =
    FOREACH (JOIN mapping BY id1, id1_weights BY id1)
    GENERATE mapping::id1, id2, weight, id1_min_weight;

accepted_mappings =
    FOREACH (GROUP mapping_with_id1_mins BY id2)
    {
        ordered = ORDER mapping_with_id1_mins BY weight;
        selected = LIMIT ordered 1;
        acceptable = FILTER selected BY weight == id1_min_weight;
        GENERATE FLATTEN(acceptable);
    };

DUMP accepted_mappings;
于 2012-11-08T22:53:01.167 に答える
0

Java UDF を使用して解決しました。1 対 1 マッピングの数を最大化しないという意味では完全ではありませんが、十分です。

豚:

d = load 'test' as (fid, iid, priority:double);
g = group d by fid;
o = foreach g generate FLATTEN(com.propeld.pig.DEDUP(d)) as (fid, iid, priority);
store o into 'output';

g2 = group o by iid;
o2 = foreach g2 generate FLATTEN(com.propeld.pig.DEDUP(o)) as (fid, iid, priority);
store o2 into 'output2';

Java UDF:

package com.propeld.pig;

import java.io.IOException;
import java.util.Iterator;

import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

public class DEDUP extends EvalFunc<Tuple> implements Algebraic{
    public String getInitial() {return Initial.class.getName();}
    public String getIntermed() {return Intermed.class.getName();}
    public String getFinal() {return Final.class.getName();}
    static public class Initial extends EvalFunc<Tuple> {
        private static TupleFactory tfact = TupleFactory.getInstance();
        public Tuple exec(Tuple input) throws IOException {
            // Initial is called in the map.
            // we just send the tuple down
            try {
                // input is a bag with one tuple containing
                // the column we are trying to operate on
                DataBag bg = (DataBag) input.get(0);
                if (bg.iterator().hasNext()) {
                    Tuple dba = (Tuple) bg.iterator().next();
                    return dba;
                } else {
                    // make sure that we call the object constructor, not the list constructor
                    return tfact.newTuple((Object) null);
                }
            } catch (ExecException e) {
                throw e;
            } catch (Exception e) {
                int errCode = 2106;
                throw new ExecException("Error executing an algebraic function", errCode, PigException.BUG, e);
            }
        }
    }
    static public class Intermed extends EvalFunc<Tuple> {
        public Tuple exec(Tuple input) throws IOException {
            return dedup(input);
        }
    }
    static public class Final extends EvalFunc<Tuple> {
        public Tuple exec(Tuple input) throws IOException {return dedup(input);}
    }

    static protected Tuple dedup(Tuple input) throws ExecException, NumberFormatException {
        DataBag values = (DataBag)input.get(0);
        Double min = Double.MAX_VALUE;
        Tuple result = null;
        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
            Tuple t = (Tuple) it.next();

            if ((Double)t.get(2) < min){
                min = (Double)t.get(2);
                result = t;
            }
        }
        return result;
    }

    @Override
    public Tuple exec(Tuple input) throws IOException {
        return dedup(input);
    }
}
于 2012-11-08T22:49:18.627 に答える