1

MapReduce と乱数生成を使用して pi の推定値を生成しようとしています。50 個のマップ ジョブを開始するために、プロセスに 50 個の小さなファイルをフィードしています。各ファイルには「hello」のみが含まれています。Mapper ステージの出力には、次のように 50 行が表示されます。

「こんにちは」1 「こんにちは」2 「こんにちは」3 ...

もちろん、これは私が考えていたことではありません。私のプログラムは、私が望んでいたように、明らかに入力を無視していません。入力が提供される唯一の目的は、各ファイルのマップ ジョブを開始することです。

以下は私の MapReduce コードです。今のところ、jar ファイルを実行するときに -D コマンド ライン引数を使用して、reduce フェーズを無視しています。

import java.util.Random;
import java.io.IOException;
import java.lang.InterruptedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

    public class PiCalc extends Configured implements Tool {

        public static class MapClass extends Mapper<IntWritable, IntWritable, DoubleWritable, DoubleWritable> {
//          private FloatWritable insideCircle = new FloatWritable();
//          private FloatWritable insideSquare = new FloatWritable();
            public void map(IntWritable key, DoubleWritable value, Context context) throws IOException, InterruptedException {

            int ITERATIONS = 1000;                                            
            int inCircle = 0;
            int n = 0;

             double x, y;                                                 

             for (int i = 0; i < ITERATIONS ; i++)                         
             {                                                           
                x = Math.random();                       
                y = Math.random();                                               

                n++;                                                       

                if ( x*x + y*y <= 1 ){             
                   inCircle++;   
                }       
             }   

//           insideCircle.set(inCircle);
//           insideSquare.set(n);

//           insideCircle.set(99);
//           insideCircle.set(88);

             context.write(new DoubleWritable(inCircle), new DoubleWritable(n));
            }
        }   

        public static class Reduce extends Reducer<FloatWritable, FloatWritable, Text, FloatWritable> {

            public void reduce(Iterable<FloatWritable> key, Iterable<FloatWritable> values, Context context) 
                    throws IOException, InterruptedException {

                float pi = 0;
                float numerator = 0;
                float denominator = 0;

                for (FloatWritable k:key){   
                    numerator += k.get();      
                }

                for (FloatWritable val:values){  
                    denominator += val.get();        
                }

                pi = 4 * numerator / denominator;

                context.write(new Text("Pi: "), new FloatWritable(pi));
            }
        }   
        public int run(String[] args) throws Exception {
            Configuration conf = getConf();      
//          conf.set("key.value.separator.in.input.line", ",");
            Job job = new Job(conf, "PiCalc");        
            job.setJarByClass(PiCalc.class);            
            Path in = new Path(args[0]);
            Path out = new Path(args[1]);
            FileInputFormat.setInputPaths(job, in);
            FileOutputFormat.setOutputPath(job, out);
            job.setJobName("PiCalc");     
            job.setMapperClass(MapClass.class);
            job.setReducerClass(Reduce.class);   
            job.setInputFormatClass(KeyValueTextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
//          job.setNumMapTasks(50);
            System.exit(job.waitForCompletion(true)?0:1);
            return 0;
        }  
        public static void main(String[] args) throws Exception { 
            int res = ToolRunner.run(new Configuration(), new PiCalc(), args);       
            System.exit(res);
        }
    }

どんな提案でも大歓迎です。ありがとう!

4

0 に答える 0