0

この Hadoop マップは、グラフ データ (隣接リスト形式) で機能するコードを削減し、隣接リストから隣接リストへの変換アルゴリズムに似ています。メインの MapReduce タスク コードは次のとおりです。

public class TestTask extends Configured
implements Tool {

public static class TTMapper extends MapReduceBase
    implements Mapper<Text, TextArrayWritable, Text, NeighborWritable> {

    @Override
    public void map(Text key, 
            TextArrayWritable value,
            OutputCollector<Text, NeighborWritable> output, 
            Reporter reporter) throws IOException {

        int numNeighbors = value.get().length;
        double weight = (double)1 / numNeighbors;

        Text[] neighbors = (Text[]) value.toArray();

        NeighborWritable me = new NeighborWritable(key, new DoubleWritable(weight));

        for (int i = 0; i < neighbors.length; i++) {
            output.collect(neighbors[i], me);
        }   
    }       
}

public static class TTReducer extends MapReduceBase
    implements Reducer<Text, NeighborWritable, Text, Text> {

    @Override
    public void reduce(Text key, 
                        Iterator<NeighborWritable> values,
                        OutputCollector<Text, Text> output, 
                        Reporter arg3)
            throws IOException {

        ArrayList<NeighborWritable> neighborList = new ArrayList<NeighborWritable>();

        while(values.hasNext()) {
            neighborList.add(values.next());
        }

        NeighborArrayWritable neighbors = new NeighborArrayWritable
                            (neighborList.toArray(new NeighborWritable[0]));

        Text out = new Text(neighbors.toString());

        output.collect(key, out);

    }

}

@Override
public int run(String[] arg0) throws Exception {
    JobConf conf = Util.getMapRedJobConf("testJob",
                                         SequenceFileInputFormat.class, 
                                         TTMapper.class, 
                                         Text.class, 
                                         NeighborWritable.class, 
                                         1, 
                                         TTReducer.class, 
                                         Text.class, 
                                         Text.class, 
                                         TextOutputFormat.class, 
                                         "test/in", 
                                         "test/out");
    JobClient.runJob(conf);
    return 0;
}

public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new TestTask(), args);
    System.exit(res);
}

}

補助コードは次のとおりです: TextArrayWritable:

public class TextArrayWritable extends ArrayWritable {
public TextArrayWritable() {
    super(Text.class);
}

public TextArrayWritable(Text[] values) {
    super(Text.class, values);
}

}

ネイバー書き込み可能:

public class NeighborWritable implements Writable {

private Text nodeId;
private DoubleWritable weight;

public NeighborWritable(Text nodeId, DoubleWritable weight) {
    this.nodeId = nodeId;
    this.weight = weight;
}

public NeighborWritable () { }

public Text getNodeId() {
    return nodeId;
}

public DoubleWritable getWeight() {
    return weight;
}

public void setNodeId(Text nodeId) {
    this.nodeId = nodeId;
}

public void setWeight(DoubleWritable weight) {
    this.weight = weight;
}

@Override
public void readFields(DataInput in) throws IOException {
    nodeId = new Text();
    nodeId.readFields(in);

    weight = new DoubleWritable();
    weight.readFields(in);
}

@Override
public void write(DataOutput out) throws IOException {
    nodeId.write(out);
    weight.write(out);
}

public String toString() {
    return "NW[nodeId=" + (nodeId != null ? nodeId.toString() : "(null)") +
        ",weight=" + (weight != null ? weight.toString() : "(null)") + "]";
}

public boolean equals(Object o) {
    if (!(o instanceof NeighborWritable)) {
        return false;
    }

    NeighborWritable that = (NeighborWritable)o;

    return (nodeId.equals(that.getNodeId()) && (weight.equals(that.getWeight())));
}

}

Util クラス:

public class Util {

public static JobConf getMapRedJobConf(String jobName,
                                              Class<? extends InputFormat> inputFormatClass,
                                              Class<? extends Mapper> mapperClass,
                                              Class<?> mapOutputKeyClass,
                                              Class<?> mapOutputValueClass,
                                              int numReducer,
                                              Class<? extends Reducer> reducerClass,
                                              Class<?> outputKeyClass,
                                              Class<?> outputValueClass,
                                              Class<? extends OutputFormat> outputFormatClass,
                                              String inputDir,
                                              String outputDir) throws IOException {

    JobConf conf = new JobConf();

    if (jobName != null)
        conf.setJobName(jobName);

    conf.setInputFormat(inputFormatClass);

    conf.setMapperClass(mapperClass);

    if (numReducer == 0) {
        conf.setNumReduceTasks(0);

        conf.setOutputKeyClass(outputKeyClass);
        conf.setOutputValueClass(outputValueClass);

        conf.setOutputFormat(outputFormatClass);

    } else {
        // may set actual number of reducers
        // conf.setNumReduceTasks(numReducer);

        conf.setMapOutputKeyClass(mapOutputKeyClass);
        conf.setMapOutputValueClass(mapOutputValueClass);

        conf.setReducerClass(reducerClass);

        conf.setOutputKeyClass(outputKeyClass);
        conf.setOutputValueClass(outputValueClass);

        conf.setOutputFormat(outputFormatClass);

    }

    // delete the existing target output folder
    FileSystem fs = FileSystem.get(conf);
    fs.delete(new Path(outputDir), true);


    // specify input and output DIRECTORIES (not files)
    FileInputFormat.addInputPath(conf, new Path(inputDir));
    FileOutputFormat.setOutputPath(conf, new Path(outputDir));

    return conf;        

}

}

私の入力は次のグラフです: (バイナリ形式で、ここではテキスト形式を指定しています)

1   2
2   1,3,5
3   2,4
4   3,5
5   2,4

コードのロジックによると、出力は次のようになります。

1   NWArray[size=1,{NW[nodeId=2,weight=0.3333333333333333],}]
2   NWArray[size=3,{NW[nodeId=5,weight=0.5],NW[nodeId=3,weight=0.5],NW[nodeId=1,weight=1.0],}]
3   NWArray[size=2,{NW[nodeId=2,weight=0.3333333333333333],NW[nodeId=4,weight=0.5],}]
4   NWArray[size=2,{NW[nodeId=5,weight=0.5],NW[nodeId=3,weight=0.5],}]
5   NWArray[size=2,{NW[nodeId=2,weight=0.3333333333333333],NW[nodeId=4,weight=0.5],}]

しかし、出力は次のようになります。

1   NWArray[size=1,{NW[nodeId=2,weight=0.3333333333333333],}]
2   NWArray[size=3,{NW[nodeId=5,weight=0.5],NW[nodeId=5,weight=0.5],NW[nodeId=5,weight=0.5],}]
3   NWArray[size=2,{NW[nodeId=2,weight=0.3333333333333333],NW[nodeId=2,weight=0.3333333333333333],}]
4   NWArray[size=2,{NW[nodeId=5,weight=0.5],NW[nodeId=5,weight=0.5],}]
5   NWArray[size=2,{NW[nodeId=2,weight=0.3333333333333333],NW[nodeId=2,weight=0.3333333333333333],}]

期待した出力が出ない理由がわかりません。どんな助けでも大歓迎です。

ありがとう。

4

2 に答える 2

3

オブジェクトの再利用に違反しています

while(values.hasNext()) {
    neighborList.add(values.next());
}

values.next()は同じオブジェクト参照を返しますが、そのオブジェクトの基になるコンテンツは反復ごとに変更されます (readFieldsコンテンツを再設定するためにメソッドが呼び出されます)。

conf修正することをお勧めします ( Reporter または OutputCollector から取得できない場合を除き、セットアップ メソッドから構成変数を取得する必要があります。古い API は使用していません)。

while(values.hasNext()) {
    neighborList.add(
        ReflectionUtils.copy(conf, values.next(), new NeighborWritable());
}
于 2012-07-12T14:32:37.390 に答える
0

しかし、なぜ私の単体テストが合格したのか、まだ理解できません。ここにコードがあります -

public class UWLTInitReducerTest {

private Text key;
private Iterator<NeighborWritable> values;
private NeighborArrayWritable nodeData;
private TTReducer reducer;

/**
 * Set up the states for calling the map function
 */
@Before
public void setUp() throws Exception {
    key = new Text("1001");
    NeighborWritable[] neighbors = new NeighborWritable[4];
    for (int i = 0; i < 4; i++) {
        neighbors[i] = new NeighborWritable(new Text("300" + i), new DoubleWritable((double) 1 / (1 + i)));
    }

    values = Arrays.asList(neighbors).iterator();

    nodeData = new NeighborArrayWritable(neighbors);

    reducer = new TTReducer();

}

/**
 * Test method for InitModelMapper#map - valid input
 */
@Test
public void testMapValid() {

    // mock the output object
    OutputCollector<Text, UWLTNodeData> output = mock(OutputCollector.class);

    try {
        // call the API
        reducer.reduce(key, values, output, null);

        // in order (sequential) verification of the calls to output.collect()
        verify(output).collect(key, nodeData);

    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

}

}

このコードでバグが検出されなかったのはなぜですか?

于 2012-07-12T19:17:52.323 に答える