「SSaikia_JtheRocker」で説明されているように、マッパー タスクは、HDFS ブロックの論理分割の総数に従って作成されます。質問 #3 に何か追加したいと思います。「1 つの入力ファイルがマッパー間でどのように分散されますか?また、1 つのマッパーの出力が複数のレデューサー間で分散されます (これはフレームワークによって行われるか、変更できます)?」たとえば、ファイル内の単語数をカウントする単語カウント プログラムを以下に示します。
#
public class WCMapper は Mapper を拡張します {
@Override
public void map(LongWritable key, Text value, Context context) // Context context is output
throws IOException, InterruptedException {
// value = "How Are You"
String line = value.toString(); // This is converting the Hadoop's "How Are you" to Java compatible "How Are You"
StringTokenizer tokenizer = new StringTokenizer (line); // StringTokenizer returns an array tokenizer = {"How", "Are", "You"}
while (tokenizer.hasMoreTokens()) // hasMoreTokens is a method in Java which returns boolean values 'True' or 'false'
{
value.set(tokenizer.nextToken()); // value's values are overwritten with "How"
context.write(value, new IntWritable(1)); // writing the current context to local disk
// How, 1
// Are, 1
// You, 1
// Mapper will run as many times as the number of lines
}
}
}
#
上記のプログラムでは、「How are you」という行を StringTokenizer で 3 つの単語に分割し、これを while ループで使用すると、単語の数だけマッパーが呼び出されるため、ここでは 3 つのマッパーが呼び出されます。
レデューサーについては、「job.setNumReduceTasks(5);」を使用して、出力を生成するレデューサーの数を指定できます。声明。以下のコード スニペットからアイデアが得られます。
#
public class BooksMain {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// Use programArgs array to retrieve program arguments.
String[] programArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
Job job = new Job(conf);
job.setJarByClass(BooksMain.class);
job.setMapperClass(BookMapper.class);
job.setReducerClass(BookReducer.class);
job.setNumReduceTasks(5);
// job.setCombinerClass(BookReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// TODO: Update the input path for the location of the inputs of the map-reduce job.
FileInputFormat.addInputPath(job, new Path(programArgs[0]));
// TODO: Update the output path for the output directory of the map-reduce job.
FileOutputFormat.setOutputPath(job, new Path(programArgs[1]));
// Submit the job and wait for it to finish.
job.waitForCompletion(true);
// Submit and return immediately:
// job.submit();
}
}
#