3

これが私のMap

 public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
            String[] fields = value.toString().split(",", -20);
            String country = fields[4];
            String numClaims = fields[8];
            if (numClaims.length() > 0 && !numClaims.startsWith("\"")) {
                context.write(new Text(country), new Text(numClaims + ",1"));
            }
        }
    }

そしてここに私のReduce

public void reduce(Text key, Iterator<Text> values, Context context) throws IOException, InterruptedException {
            double sum = 0.0;
            int count = 0;

            while (values.hasNext()) {
                String[] fields = values.next().toString().split(",");
                sum += Double.parseDouble(fields[0]);
                count += Integer.parseInt(fields[1]);
            }

            context.write(new Text(key), new DoubleWritable(sum/count));
        }

設定方法は次のとおりです

Job job = new Job(getConf());

            job.setJarByClass(AverageByAttributeUsingCombiner.class);
            job.setJobName("AverageByAttributeUsingCombiner");

            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            job.setMapperClass(MapClass.class);
    //        job.setCombinerClass(Combinber.class);
            job.setReducerClass(Reduce.class);

            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);

            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

    //        job.setNumReduceTasks(0); // to not run the reducer
            boolean success = job.waitForCompletion(true);
            return success ? 0 : 1;

入力は形式です

   "PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD│                                                                                                                                                                                                                
    ","SECDLWBD"                                                                                                                                                                                                         │                                                                                                                                                                                                                
    3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,                                                                                                                                                                 │                                                                                                                                                                                                                
    3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,                                                                                                                                                                  │                                                                                                                                                                                                                
    3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,                                                                                                                                                            │                                                                                                                                                                                                                
    3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,        

全体の出力は次のmap reduceようになります

"AR"5,1│<br>"AR"9,1│<br>"AR"2,1│<br>"AR"151│<br>"AR"13,1│<br>"AR"1,1│<br>"AR"34、1│<br>"AR"12,1│<br>"AR"8,1│<br>"AR"7、1│<br>"AR"23,1│<br>"AR"3,1│<br>"AR"41│<br>「AR」4,1

この問題をデバッグして修正するにはどうすればよいですか?私はHadoopを学んでいます

4

3 に答える 3

4

すでに述べたように、問題は、デフォルトの抽象Reducerクラスのデフォルトのreduceメソッドをオーバーライドしていないことです。

より具体的には、これまでの(1つ/の)問題は、reduceメソッドのシグネチャが次のとおりであるということです。

 public void reduce(Text key, **Iterator**<Text> values, Context context) 
             throws IOException, InterruptedException

代わりに、次のようになります。

 public void reduce(Text key, **Iterable**<Text> values, Context context) 
             throws IOException, InterruptedException

古いAPIバージョンは正しく、Reducerインターフェースreduce()メソッドを実装すれば機能します。

この種の状況の適切な検証は@Override、署名の不一致のコンパイル時チェックを強制するため、を使用することです。

于 2012-07-31T21:33:44.687 に答える
1

あなたのレデューサーは「キャッチ」していません。型の不一致などがある可能性が高いため、reduce関数は継承元の抽象インターフェイスと一致していません...したがって、オーバーライドされていません。デフォルトでは、reduceを使用しますがIdentityReducer、これは何もしません(これはあなたが経験していることです)。

実際にオーバーライドしていることを確認するには、次を追加し@overrideます。

@override
public void reduce(Text key, Iterator<Text> values, Context context)

関数のシグネチャが一致しないため、これはエラーをスローします。これが問題の診断に役立つことを願っています。

于 2012-07-31T21:05:59.470 に答える
1
  • 私は現在、新しいAPIを使用hadoop-core-1.0.3.jarして作成しようとしましたが、Map Reduceなぜ機能しなかったのかわかりません
  • このプログラムはHadoopinActionコードの一部であり、私はこの本でHadoopを学んでいます。
  • 同じmap reduceプログラムをで実行した場合old API syntax、それは完全に正常に機能します。
  • コードは次のようになります(Combinerが含まれているので、最初にCombinerの前にテストしていました)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.Iterator;


public class AveragingWithCombiner extends Configured implements Tool {

    public static class MapClass extends MapReduceBase
            implements Mapper<LongWritable, Text, Text, Text> {

        static enum ClaimsCounters { MISSING, QUOTED };

        public void map(LongWritable key, Text value,
                        OutputCollector<Text, Text> output,
                        Reporter reporter) throws IOException {

            String fields[] = value.toString().split(",", -20);
            String country = fields[4];
            String numClaims = fields[8];

            if (numClaims.length() > 0 && !numClaims.startsWith("\"")) {
                output.collect(new Text(country), new Text(numClaims + ",1"));
            }
        }
    }

    public static class Combine extends MapReduceBase
            implements Reducer<Text, Text, Text, Text> {

        public void reduce(Text key, Iterator<Text> values,
                           OutputCollector<Text, Text> output,
                           Reporter reporter) throws IOException {

            double sum = 0;
            int count = 0;
            while (values.hasNext()) {
                String fields[] = values.next().toString().split(",");
                sum += Double.parseDouble(fields[0]);
                count += Integer.parseInt(fields[1]);
            }
            output.collect(key, new Text(sum + "," + count));
        }
    }

    public static class Reduce extends MapReduceBase
            implements Reducer<Text, Text, Text, DoubleWritable> {

        public void reduce(Text key, Iterator<Text> values,
                           OutputCollector<Text, DoubleWritable> output,
                           Reporter reporter) throws IOException {

            double sum = 0;
            int count = 0;
            while (values.hasNext()) {
                String fields[] = values.next().toString().split(",");
                sum += Double.parseDouble(fields[0]);
                count += Integer.parseInt(fields[1]);
            }
            output.collect(key, new DoubleWritable(sum/count));
        }
    }

    public int run(String[] args) throws Exception {
        // Configuration processed by ToolRunner
        Configuration conf = getConf();

        // Create a JobConf using the processed conf
        JobConf job = new JobConf(conf, AveragingWithCombiner.class);

        // Process custom command-line options
        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);

        // Specify various job-specific parameters
        job.setJobName("AveragingWithCombiner");
        job.setMapperClass(MapClass.class);
        job.setCombinerClass(Combine.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormat(TextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // Submit the job, then poll for progress until the job is complete
        JobClient.runJob(job);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        // Let ToolRunner handle generic command-line options
        int res = ToolRunner.run(new Configuration(), new AveragingWithCombiner(), args);

        System.exit(res);
    }
}
于 2012-07-31T21:27:41.010 に答える