0

それはかなり一般的な問題です。何を選択すればよいかわかりません。

フィールドがあります:id、creationDate、state、dateDiff

id自然キーです。

レデューサーに入る必要があります:

KEY(id)、VALUE(作成日、状態、dateDiff)

VALUE(creationDate, state, dateDiff) は次の基準でソートする必要があります: creationDate, state

どのキーを選択する必要がありますか? 複合キー (id、creationDate、state) を作成しました

ID によるパーティショナーを実装しました

IDによるグルーパー

ID、作成日、状態によるソーター

私のレデューサーは一意のIDのみを取得します...例:

1 123 true  6
1 456 false 6
1 789 true  7

私は得るだけ

1 123 true  6

私の減速機で。私はソーター、パーティショナー、グルーパーを取得していないようです:(理解のラックがあります。

これが私のコードです:

public class POIMapper extends Mapper<LongWritable, Text, XVLRKey, XVLRValue>{

    private static final Log LOG = LogFactory.getLog(POIMapper.class);

    @Override
    public void map(LongWritable key, Text csvLine, Context context) throws IOException, InterruptedException {
        Pair<XVLRKey, XVLRValue> xvlrPair = POIUtil.parseKeyAndValue(csvLine.toString(), POIUtil.CSV_DELIMITER);
        context.write(xvlrPair.getValue0(), xvlrPair.getValue1());
    }

}

public class POIReducer extends Reducer<XVLRKey, XVLRValue, LongWritable, Text>{

    private static final Log LOG = LogFactory.getLog(POIReducer.class);

    private final Text textForOutput = new Text();

    @Override()
    public void reduce(XVLRKey key, Iterable<XVLRValue> values, Context context)
                                                                            throws IOException, InterruptedException {
        XVLROutput out = null;
//Just check that values are correctly attached to keys. No logic here...
        LOG.info("\nPOIReducer: key:"+key);
        for(XVLRValue value : values){
            LOG.info("\n --- --- --- value:"+value+"\n");
            textForOutput.set(print(key, value));
            context.write(key.getMsisdn(), textForOutput);
        }
    }

    private String print(XVLRKey key, XVLRValue value){
        StringBuilder builder = new StringBuilder();
        builder.append(value.getLac())          .append("\t")
               .append(value.getCellId())       .append("\t")
               .append(key.getDateOccurrence()) .append("\t")
               .append(value.getTimeDelta());
        return builder.toString();
    }
}

ジョブコード:

JobBuilder<POIJob> jobBuilder = createTestableJobInstance();

        jobBuilder.withOutputKey(XVLRKey.class);
        jobBuilder.withOutputValue(XVLRValue.class);

        jobBuilder.withMapper(POIMapper.class);
        jobBuilder.withReducer(POIReducer.class);

        jobBuilder.withInputFormat(TextInputFormat.class);
        jobBuilder.withOutputFormat(TextOutputFormat.class);

        jobBuilder.withPartitioner(XVLRKeyPartitioner.class);
        jobBuilder.withSortComparator(XVLRCompositeKeyComparator.class);
        jobBuilder.withGroupingComparator(XVLRKeyGroupingComparator.class);

        boolean result = buildSubmitAndWaitForCompletion(jobBuilder);
        MatcherAssert.assertThat(result, Matchers.is(true));




public class XVLRKeyPartitioner extends Partitioner<XVLRKey, XVLRValue> {

    @Override
    public int getPartition(XVLRKey key, XVLRValue value, int numPartitions) {
            return Math.abs(key.getMsisdn().hashCode() * 127) % numPartitions;
    }
}

public class XVLRCompositeKeyComparator extends WritableComparator {

    protected XVLRCompositeKeyComparator() {
        super(XVLRKey.class, true);
    }

    @Override
    public int compare(WritableComparable writable1, WritableComparable writable2) {
        XVLRKey key1 = (XVLRKey) writable1;
        XVLRKey key2 = (XVLRKey) writable2;
       return key1.compareTo(key2);
    }
}

public class XVLRKeyGroupingComparator extends WritableComparator {

    protected XVLRKeyGroupingComparator() {
        super(XVLRKey.class, true);
    }

    @Override
    public int compare(WritableComparable writable1, WritableComparable writable2) {

        XVLRKey key1 = (XVLRKey) writable1;
        XVLRKey key2 = (XVLRKey) writable2;

        return key1.getMsisdn().compareTo(key2.getMsisdn());

    }
}

public class XVLRKey implements WritableComparable<XVLRKey>{

    private  final LongWritable msisdn;
    private  final LongWritable dateOccurrence;
    private  final BooleanWritable state;
//getters-setters
}

public class XVLRValue implements WritableComparable<XVLRValue> {

    private final LongWritable lac;
    private final LongWritable cellId;
    private final LongWritable timeDelta;
    private final LongWritable dateOccurrence;
    private final BooleanWritable state;
//getters-setterrs
}

XVLRKey、XVLRValue に重複したフィールドがあることを確認してください。レデューサーでソートされた値を取得したいので、XVLRKey で dateOccurrence と state を複製しました。それらはdateOccurrenceでソートする必要があります。

重複せずにこの問題を解決する方法が見つかりません。

4

1 に答える 1

0

二次ソートの状況(あなたが説明するような)では、イテレータから次の値を取得するときに、持っているキーの値が変更されます。

これは、Hadoopフレームワークがオブジェクトのインスタンスを再利用して、オブジェクトの作成とガベージコレクションを可能な限り回避するために発生します。

したがって、「next()」を呼び出すと、フレームワークはキーインスタンス内のデータも変更します。

だからあなたが移動した場合

    LOG.info("\nPOIReducer: key:"+key);

forループ内にあるようにステートメントを記述して、すべてのキーが来るのを確認する必要があります。

この効果のために、私は基本的に次の「ルール」で仕事をします。

キーは、値を適切なレデューサーに転送するためにフレームワークによってのみ使用されます。

これは、

  1. 私が必要とするかもしれないすべては価値の中に存在しなければなりません。
  2. レデューサーでは、値のみを確認し、常にキーを破棄/無視します。
  3. キーの作成に使用されるプロパティは、値にもあります。
于 2013-01-17T16:25:52.897 に答える