MapReduce と乱数生成を使用して pi の推定値を生成しようとしています。50 個のマップ ジョブを開始するために、プロセスに 50 個の小さなファイルをフィードしています。各ファイルには「hello」のみが含まれています。Mapper ステージの出力には、次のように 50 行が表示されます。
「こんにちは」1 「こんにちは」2 「こんにちは」3 ...
もちろん、これは私が考えていたことではありません。私のプログラムは、私が望んでいたように、明らかに入力を無視していません。入力が提供される唯一の目的は、各ファイルのマップ ジョブを開始することです。
以下は私の MapReduce コードです。今のところ、jar ファイルを実行するときに -D コマンド ライン引数を使用して、reduce フェーズを無視しています。
import java.util.Random;
import java.io.IOException;
import java.lang.InterruptedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class PiCalc extends Configured implements Tool {
public static class MapClass extends Mapper<IntWritable, IntWritable, DoubleWritable, DoubleWritable> {
// private FloatWritable insideCircle = new FloatWritable();
// private FloatWritable insideSquare = new FloatWritable();
public void map(IntWritable key, DoubleWritable value, Context context) throws IOException, InterruptedException {
int ITERATIONS = 1000;
int inCircle = 0;
int n = 0;
double x, y;
for (int i = 0; i < ITERATIONS ; i++)
{
x = Math.random();
y = Math.random();
n++;
if ( x*x + y*y <= 1 ){
inCircle++;
}
}
// insideCircle.set(inCircle);
// insideSquare.set(n);
// insideCircle.set(99);
// insideCircle.set(88);
context.write(new DoubleWritable(inCircle), new DoubleWritable(n));
}
}
public static class Reduce extends Reducer<FloatWritable, FloatWritable, Text, FloatWritable> {
public void reduce(Iterable<FloatWritable> key, Iterable<FloatWritable> values, Context context)
throws IOException, InterruptedException {
float pi = 0;
float numerator = 0;
float denominator = 0;
for (FloatWritable k:key){
numerator += k.get();
}
for (FloatWritable val:values){
denominator += val.get();
}
pi = 4 * numerator / denominator;
context.write(new Text("Pi: "), new FloatWritable(pi));
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
// conf.set("key.value.separator.in.input.line", ",");
Job job = new Job(conf, "PiCalc");
job.setJarByClass(PiCalc.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("PiCalc");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// job.setNumMapTasks(50);
System.exit(job.waitForCompletion(true)?0:1);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new PiCalc(), args);
System.exit(res);
}
}
どんな提案でも大歓迎です。ありがとう!