21

http://hadoop.apache.org/docs/mapreduce/r0.21.0/api/index.html?org/apache/hadoop/mapreduce/lib/output/MultipleOutputsMultipleOutputsの例のようにクラスを使用しようとしました.html

ドライバーコード

    Configuration conf = new Configuration();
    Job job = new Job(conf, "Wordcount");
    job.setJarByClass(WordCount.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapperClass(WordCountMapper.class);
    job.setReducerClass(WordCountReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
            Text.class, IntWritable.class);
    System.exit(job.waitForCompletion(true) ? 0 : 1);

レデューサーコード

public class WordCountReducer extends
        Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();
    private MultipleOutputs<Text, IntWritable> mos;
    public void setup(Context context){
        mos = new MultipleOutputs<Text, IntWritable>(context);
    }
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        //context.write(key, result);
        mos.write("text", key,result);
    }
    public void cleanup(Context context)  {
         try {
            mos.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
         }
}

レデューサーの出力の名前がtext-r-00000に変更されていることがわかります

しかし、ここでの問題は、空のpart-r-00000ファイルも取得していることです。これはMultipleOutputsの動作が期待される方法ですか、それともコードに問題がありますか?ご意見をお聞かせください。

私が試したもう1つの方法は、FileSystemクラスを使用して出力フォルダーを反復処理し、partで始まるすべてのファイルの名前を手動で変更することです。

最善の方法は何ですか?

FileSystem hdfs = FileSystem.get(configuration);
FileStatus fs[] = hdfs.listStatus(new Path(outputPath));
for (FileStatus aFile : fs) {
if (aFile.isDir()) {
hdfs.delete(aFile.getPath(), true);
// delete all directories and sub-directories (if any) in the output directory
} 
else {
if (aFile.getPath().getName().contains("_"))
hdfs.delete(aFile.getPath(), true);
// delete all log files and the _SUCCESS file in the output directory
else {
hdfs.rename(aFile.getPath(), new Path(myCustomName));
}
}
4

2 に答える 2

21

を使用している場合でもMultipleOutputs、デフォルトOutputFormat(私はそれが使用されていると思いますTextOutputFormat)がまだ使用されているため、表示されているこれらのpart-r-xxxxxファイルを初期化して作成します。

それらが空であるという事実は、を使用しているために何もしていないcontext.writeためですMultipleOutputs。ただし、初期化中に作成されることを妨げるものではありません。

それらを取り除くために、あなたOutputFormatはあなたがどんな出力も期待していないと言うためにあなたを定義する必要があります。あなたはこのようにそれを行うことができます:

job.setOutputFormat(NullOutputFormat.class);

このプロパティを設定すると、パーツファイルがまったく初期化されないようにする必要がありますが、出力はで取得されますMultipleOutputs

また、おそらくLazyOutputFormatこれを使用して、出力ファイルがデータがある場合にのみ作成され、空のファイルを初期化しないようにすることもできます。あなたは私をこのようにすることができます:

import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; 
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

Reducerプロトタイプで使用していることに注意してください。MultipleOutputs.write(String namedOutput, K key, V value)これは、に基づいて生成されるデフォルトの出力パスを使用するだけnamedOutputです{namedOutput}-(m|r)-{part-number}。出力ファイル名をより細かく制御したい場合はMultipleOutputs.write(String namedOutput, K key, V value, String baseOutputPath)、キー/値に基づいて実行時に生成されるファイル名を取得できるプロトタイプを使用する必要があります。

于 2013-01-28T04:39:07.013 に答える
11

これは、出力ファイルのベース名を変更するためにDriverクラスで行う必要があるすべてです。 job.getConfiguration().set("mapreduce.output.basename", "text"); したがって、これにより、ファイルは「text-r-00000」と呼ばれるようになります。

于 2015-02-03T12:08:55.257 に答える