1

小さな mapreduce プログラムを開発しました。プロセス ログを開くと、フレームワークによって 1 つのマップと 2 つのレデューサーが作成されていることがわかりました。入力用のファイルは 1 つだけで、出力ファイルは 2 つありました。今教えてください

1) Number of mapper and reducer are created by framework or it can be changed?
2) Number of output files always equal to number of reducers? i.e. each reducer
   creates its   own output file?
3) How one input file is distributed among mappers? And output of one mapper is 
   distributed among multiple reducers (this is done by framework or you can change)?
4) How to manage when multiple input files are there i.e. A directory ,
   containing input files?

これらの質問に答えてください。私はMapReduceの初心者です。

4

2 に答える 2

0

「SSaikia_JtheRocker」で説明されているように、マッパー タスクは、HDFS ブロックの論理分割の総数に従って作成されます。質問 #3 に何か追加したいと思います。「1 つの入力ファイルがマッパー間でどのように分散されますか?また、1 つのマッパーの出力が複数のレデューサー間で分散されます (これはフレームワークによって行われるか、変更できます)?」たとえば、ファイル内の単語数をカウントする単語カウント プログラムを以下に示します。

#

public class WCMapper は Mapper を拡張します {

@Override
public void map(LongWritable key, Text value, Context context) // Context context is output
        throws IOException, InterruptedException {

    // value = "How Are You"
    String line = value.toString(); // This is converting the Hadoop's "How Are you" to Java compatible "How Are You"

    StringTokenizer tokenizer = new StringTokenizer (line); // StringTokenizer returns an array tokenizer = {"How", "Are", "You"}

    while (tokenizer.hasMoreTokens()) // hasMoreTokens is a method in Java which returns boolean values 'True' or 'false'
    {
        value.set(tokenizer.nextToken()); // value's values are overwritten with "How" 
        context.write(value, new IntWritable(1)); // writing the current context to local disk
        // How, 1
        // Are, 1
        // You, 1
        // Mapper will run as many times as the number of lines 
    }
}

}

#

上記のプログラムでは、「How are you」という行を StringTokenizer で 3 つの単語に分割し、これを while ループで使用すると、単語の数だけマッパーが呼び出されるため、ここでは 3 つのマッパーが呼び出されます。

レデューサーについては、「job.setNumReduceTasks(5);」を使用して、出力を生成するレデューサーの数を指定できます。声明。以下のコード スニペットからアイデアが得られます。

#

public class BooksMain {

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    // Use programArgs array to retrieve program arguments.
    String[] programArgs = new GenericOptionsParser(conf, args)
            .getRemainingArgs();
    Job job = new Job(conf);
    job.setJarByClass(BooksMain.class);
    job.setMapperClass(BookMapper.class);
    job.setReducerClass(BookReducer.class);
    job.setNumReduceTasks(5);

// job.setCombinerClass(BookReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    // TODO: Update the input path for the location of the inputs of the map-reduce job.
    FileInputFormat.addInputPath(job, new Path(programArgs[0]));
    // TODO: Update the output path for the output directory of the map-reduce job.
    FileOutputFormat.setOutputPath(job, new Path(programArgs[1]));

    // Submit the job and wait for it to finish.
    job.waitForCompletion(true);
    // Submit and return immediately: 
    // job.submit();
}

}

#
于 2014-04-02T16:54:01.843 に答える