それはかなり一般的な問題です。何を選択すればよいかわかりません。
フィールドがあります:id、creationDate、state、dateDiff
idは自然キーです。
レデューサーに入る必要があります:
KEY(id)、VALUE(作成日、状態、dateDiff)
VALUE(creationDate, state, dateDiff) は次の基準でソートする必要があります: creationDate, state
どのキーを選択する必要がありますか? 複合キー (id、creationDate、state) を作成しました
ID によるパーティショナーを実装しました
IDによるグルーパー
ID、作成日、状態によるソーター
私のレデューサーは一意のIDのみを取得します...例:
1 123 true 6
1 456 false 6
1 789 true 7
私は得るだけ
1 123 true 6
私の減速機で。私はソーター、パーティショナー、グルーパーを取得していないようです:(理解のラックがあります。
これが私のコードです:
public class POIMapper extends Mapper<LongWritable, Text, XVLRKey, XVLRValue>{
private static final Log LOG = LogFactory.getLog(POIMapper.class);
@Override
public void map(LongWritable key, Text csvLine, Context context) throws IOException, InterruptedException {
Pair<XVLRKey, XVLRValue> xvlrPair = POIUtil.parseKeyAndValue(csvLine.toString(), POIUtil.CSV_DELIMITER);
context.write(xvlrPair.getValue0(), xvlrPair.getValue1());
}
}
public class POIReducer extends Reducer<XVLRKey, XVLRValue, LongWritable, Text>{
private static final Log LOG = LogFactory.getLog(POIReducer.class);
private final Text textForOutput = new Text();
@Override()
public void reduce(XVLRKey key, Iterable<XVLRValue> values, Context context)
throws IOException, InterruptedException {
XVLROutput out = null;
//Just check that values are correctly attached to keys. No logic here...
LOG.info("\nPOIReducer: key:"+key);
for(XVLRValue value : values){
LOG.info("\n --- --- --- value:"+value+"\n");
textForOutput.set(print(key, value));
context.write(key.getMsisdn(), textForOutput);
}
}
private String print(XVLRKey key, XVLRValue value){
StringBuilder builder = new StringBuilder();
builder.append(value.getLac()) .append("\t")
.append(value.getCellId()) .append("\t")
.append(key.getDateOccurrence()) .append("\t")
.append(value.getTimeDelta());
return builder.toString();
}
}
ジョブコード:
JobBuilder<POIJob> jobBuilder = createTestableJobInstance();
jobBuilder.withOutputKey(XVLRKey.class);
jobBuilder.withOutputValue(XVLRValue.class);
jobBuilder.withMapper(POIMapper.class);
jobBuilder.withReducer(POIReducer.class);
jobBuilder.withInputFormat(TextInputFormat.class);
jobBuilder.withOutputFormat(TextOutputFormat.class);
jobBuilder.withPartitioner(XVLRKeyPartitioner.class);
jobBuilder.withSortComparator(XVLRCompositeKeyComparator.class);
jobBuilder.withGroupingComparator(XVLRKeyGroupingComparator.class);
boolean result = buildSubmitAndWaitForCompletion(jobBuilder);
MatcherAssert.assertThat(result, Matchers.is(true));
public class XVLRKeyPartitioner extends Partitioner<XVLRKey, XVLRValue> {
@Override
public int getPartition(XVLRKey key, XVLRValue value, int numPartitions) {
return Math.abs(key.getMsisdn().hashCode() * 127) % numPartitions;
}
}
public class XVLRCompositeKeyComparator extends WritableComparator {
protected XVLRCompositeKeyComparator() {
super(XVLRKey.class, true);
}
@Override
public int compare(WritableComparable writable1, WritableComparable writable2) {
XVLRKey key1 = (XVLRKey) writable1;
XVLRKey key2 = (XVLRKey) writable2;
return key1.compareTo(key2);
}
}
public class XVLRKeyGroupingComparator extends WritableComparator {
protected XVLRKeyGroupingComparator() {
super(XVLRKey.class, true);
}
@Override
public int compare(WritableComparable writable1, WritableComparable writable2) {
XVLRKey key1 = (XVLRKey) writable1;
XVLRKey key2 = (XVLRKey) writable2;
return key1.getMsisdn().compareTo(key2.getMsisdn());
}
}
public class XVLRKey implements WritableComparable<XVLRKey>{
private final LongWritable msisdn;
private final LongWritable dateOccurrence;
private final BooleanWritable state;
//getters-setters
}
public class XVLRValue implements WritableComparable<XVLRValue> {
private final LongWritable lac;
private final LongWritable cellId;
private final LongWritable timeDelta;
private final LongWritable dateOccurrence;
private final BooleanWritable state;
//getters-setterrs
}
XVLRKey、XVLRValue に重複したフィールドがあることを確認してください。レデューサーでソートされた値を取得したいので、XVLRKey で dateOccurrence と state を複製しました。それらはdateOccurrenceでソートする必要があります。
重複せずにこの問題を解決する方法が見つかりません。