1

こんにちは、Hadoop MR は初めてです。ノードから宛先ノードまでの最短パスをカウントする単純な MR ジョブを作成しようとしました。基本的にロジックは次のようになります。

入力テキスト ファイルに次のパスが指定されている場合: ABCD ABD ACD BED BD BACD

出力は次のようになります: ABD BD

これは、ノード A と D の間の最短パスと、B と D の間の最短パスを与えるだけです。

私が得ている出力は次のとおりです。 [ABCD ABD ACD BED BD BACD]

同じことをするために、次のMRを書きました。しかし、それは望ましい答えを与えていません。MR をスタンドアロン モードで実行しています。

コードの何が問題なのか、その解決策を教えてください。お時間をいただきありがとうございます。

public class Shpath {


    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            String[] line = value.toString().split("\t");
            List<String> l = new ArrayList<String>();

            for(String lin :line){
                l.add(lin);
            }

            List <String>startEnd = new ArrayList<String>();
            for(String s : l){
                String g = s.substring(0,1)+s.substring((s.length())-1);
                if(!startEnd.contains(g))
                {
                    startEnd.add(g);
                }
            }

            List <String> uniqueStringList = new ArrayList<String>();
            java.util.Map finalMap = new HashMap();
            for(String s1 : startEnd){

                for(String s : l) {
                    if(s.startsWith(s1.substring(0,1)) && (s.endsWith(s1.substring((s1.length())-1)))){
                        uniqueStringList.add(s);
                    }
                 }
                 String smallestKey = null;
                 int minSize = Integer.MAX_VALUE;
                 String smallest = null;
                 for(String s2 : uniqueStringList){

                     if(s2.length() < minSize) {
                         minSize = s2.length();
                         smallest  = s2;
                         smallestKey  = s1;
                     }    
                     finalMap.put(s1,smallest);

                 }
                 uniqueStringList.clear();
            }output.collect(new Text(),new Text(finalMap.values().toString()));
        }
    }

    public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
        public void reduce(Text key, Iterator<Text> value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {

           while (value.hasNext()){
               output.collect(new Text(key),new Text(value.next()));
           }
        }
    }

    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(Shpath.class);
        conf.setJobName("shpath");

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);

        conf.setMapperClass(Map.class);
        conf.setCombinerClass(Reduce.class);
        conf.setReducerClass(Reduce.class);

        conf.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class);
        conf.setOutputFormat(org.apache.hadoop.mapred.TextOutputFormat.class);

        org.apache.hadoop.mapred.FileInputFormat.setInputPaths(conf, new Path(args[0]));
        org.apache.hadoop.mapred.FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        JobClient.runJob(conf);
    }
}
4

1 に答える 1

0

よくわかりませんが、次のようにする必要があります。

public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
        Map<String , HashMap<Integer, String> > outMap = new HashMap<String, HashMap<Integer, String> >();
        HashMap<Integer, String> tempMap = new HashMap<Integer, String>();
        tempMap.put(Integer.MAX_VALUE, "");
        outMap.put("AD", tempMap);
        outMap.put("BD", tempMap);

        String[] line = value.toString().split("\t");
        for (String path : line) {
            String tempPath = new String( new char[]{path.charAt(0) , path.charAt(path.length() - 1)});
            if(outMap.containsKey(tempPath)) {
                HashMap<Integer, String> tempOutMap = outMap.get(tempPath);
                for (Iterator itr =  tempOutMap.keySet().iterator(); itr.hasNext(); ) {
                    Integer count = (Integer) itr.next();
                    if(count > tempPath.length()){
                       tempMap.remove(count);
                       tempMap.put(tempPath.length(), tempPath);
                    }
                }
            }
        }
        for (String str : outMap.keySet()) {
          output.collect(new Text(str), new Text(outMap.get(str).values().toString()));    
        }        
    }


public void reduce(Text key, Iterator<Text> value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
           String outString;
           int smallest = Integer.MAX_VALUE;
           while (value.hasNext()){
               String str = value.next();
               if(str.length() < smallest) {
                  outString = str;
                  smallest = str.length();
               }
           }
           output.collect(new Text(key),new Text(outString));
    }
于 2013-02-06T12:20:16.173 に答える