1

すべての削減タスクの結果を使用して集計を実行する必要があります。基本的に、reduce タスクは合計とカウントと値を見つけます。すべての合計とカウントを追加して、最終的な平均を求める必要があります。

conf.setIntreduce で使ってみました。しかし、メイン関数からアクセスしようとすると失敗します

class Main {

public static class MyReducer 
extends Reducer<Text, Text,Text,IntWritable> {

    public void reduce(Text key, Iterable<Text> values, 
            Context context
            ) throws IOException, InterruptedException {
        int i = 0;
        int fd = 0, fc = 0;
        fd = context.getConfiguration().getInt("fd", -1);
        fc = context.getConfiguration().getInt("fc", -1);
        //when I check the value of fd, fc here they are fine. fc fd is shared across all reduce tasks and the updated value is seen by all reduce task. Only main function doesnt seem to have access to it.
    }
}

public static void main(String[] args) throws Exception{
    Configuration conf = new Configuration();
    conf.setInt("fc", 5);

    Job job = new Job(conf, "Flight Data");
    job.setJarByClass(FlightData.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setReducerClass(MyReducer.class);

    job.setPartitionerClass(FirstPartitioner.class);
    job.setGroupingComparatorClass(GroupComparator.class);
    job.setSortComparatorClass(KeyComparator.class);


    job.setNumReduceTasks(10);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);


    flightCount = job.getConfiguration().getInt("fc", -1);
    flightDelay = job.getConfiguration().getInt("fd", -1);
    //here when I access fc, fd, I get back 5 & 5
    System.out.println("Final " + flightCount +" " + flightDelay+ " " + flightDelay/flightCount);
}
4

2 に答える 2

0

新しいAPIrun()を使用して、マッパーとレデューサーのをオーバーライドします。org.apache.hadoop.mapreduceこれらの方法では、各マッパーまたはレデューサーから累積合計/カウントを出力できます。

また、複数のマッパーによって生成されたすべての合計のグローバル合計を取得するには、レデューサーカウントを1に制限する必要があります。

より明確にするために、以下のコードを参照してください。

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AggregationExample extends Configured implements Tool {

    /**
     * This is Mapper.
     * 
     */
    public static class MapJob extends Mapper<LongWritable, Text, Text, Text> {

        private Text outputKey = new Text();
        private Text outputValue = new Text();
        private double sum;

        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            try {
                // say that you need to sum up the value part
                sum+= Double.valueOf(value);
        }

        @Override
        public void run(Context context) throws IOException, InterruptedException {

            setup(context);
            while (context.nextKeyValue()) {
                map(context.getCurrentKey(), context.getCurrentValue(), context);
            }

            // emit out the sum per mapper
            outputKey.set(sum);
            context.write(outputKey, outputValue);// Notice that the outputValue is empty
            cleanup(context);

        }
    }

    /**
     * This is Reducer.
     * 
     */
    public static class ReduceJob extends Reducer<Text, Text, Text, Text> {

        private Text outputKey = new Text();
        private Text outputValue = new Text();
        private double sum;

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
                InterruptedException {


            // summation of values from each mapper
            sum += Double.valueOf(key.toString());

        }

        @Override
        public void run(Context context) throws IOException, InterruptedException {

            setup(context);
            while (context.nextKey()) {
                reduce(context.getCurrentKey(), context.getValues(), context);
            }

            // emit out the global sums
            outputKey.set(sum);
            context.write(outputKey, outputValue);
            cleanup(context);
        }
    }

    @Override
    public int run(String[] args) throws Exception {

        try {
            Configuration conf = getConf();

            // output key and value separator is empty as in final output only
            // key is emitted and value is empty
            conf.set("mapred.textoutputformat.separator", "");

            // Configuring mapred to have just one reducer as we need to find
            // single sum values from all the inputs
            conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
            conf.setInt("mapred.reduce.tasks", 1);

            Job job = new Job(conf);

            job.setJarByClass(AggregationExample.class);
            job.setJobName("Aggregation Example");

            job.setMapperClass(MapJob.class);
            job.setReducerClass(ReduceJob.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            FileInputFormat.setInputPaths(job, args[0]);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            boolean success = job.waitForCompletion(true);
            return success ? 0 : 1;
        } catch (Exception e) {
            e.printStackTrace();
            return 1;
        }

    }

    public static void main(String[] args) throws Exception {

        if (args.length < 2) {
            System.out
                    .println("Usage: AggregationExample <comma sparated list of input directories> <output dir>");
            System.exit(-1);
        }

        int result = ToolRunner.run(new AggregationExample(), args);
        System.exit(result);
    }

}

このアプローチを問題にうまくマッピングすることができます。

于 2013-02-25T18:21:15.733 に答える
0

解決策を見つけました。カウンターを利用しました

http://diveintodata.org/2011/03/15/an-example-of-hadoop-mapreduce-counter/

public class FlightData {

//enum for counters used by reducers
public static enum FlightCounters {
    FLIGHT_COUNT,
    FLIGHT_DELAY;
}
public static class MyReducer 
extends Reducer<Text, Text,Text,IntWritable> {

    public void reduce(Text key, Iterable<Text> values, 
            Context context
            ) throws IOException, InterruptedException {


        delay1 = Float.parseFloat(origin[5]);
        delay2 = Float.parseFloat(dest[5]);
        context.getCounter(FlightCounters.FLIGHT_COUNT).increment(1);
        context.getCounter(FlightCounters.FLIGHT_DELAY)
        .increment((long) (delay1 + delay2));

    }
}
public static void main(String[] args) throws Exception{
    float flightCount, flightDelay;
    job.waitForCompletion(true);
    //get the final results updated in counter by all map and reduce tasks
    flightCount = job.getCounters()
            .findCounter(FlightCounters.FLIGHT_COUNT).getValue();
    flightDelay = job.getCounters()
            .findCounter(FlightCounters.FLIGHT_DELAY).getValue();
}

}

于 2013-02-28T17:54:17.163 に答える