0

以下のようなMapReduceプログラムがあります

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;

public class Sample {

public static class SampleMapper extends MapReduceBase implements
        Mapper<Text, Text, Text, Text> {

    private Text word = new Text();

    @Override
    public void map(Text key, Text value,
            OutputCollector<Text, Text> output, Reporter reporter)
            throws IOException {
        StringTokenizer itr = new StringTokenizer(value.toString(),",");
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            output.collect(key, word);
        }
    }
}

public static class SampleReducer extends MapReduceBase implements
        Reducer<Text, Text, Text, Text> {

    private Text result = new Text();

    @Override
    public void reduce(Text key, Iterator<Text> values,
            OutputCollector<Text, Text> output, Reporter reporter)
            throws IOException {

        StringBuffer aggregation = new StringBuffer();
        while (values.hasNext()) {
            aggregation.append("|" + values.next().toString());
        }
        result.set(aggregation.toString());
        output.collect(key, result);
    }

}

public static void main(String args[]) throws IOException {
    JobConf conf = new JobConf(Sample.class);
    conf.setJobName("Sample");

    conf.setMapperClass(SampleMapper.class);
    conf.setReducerClass(SampleReducer.class);
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(Text.class);
    conf.setInputFormat(KeyValueTextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);
    FileInputFormat.setInputPaths(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));

    JobClient.runJob(conf);

}
}

私は瓶を作り、出力を得ようとしました。しかし、作成中の出力ファイルは空です。

次のコマンドを使用してジョブを実行しています

hadoop jar mapreduce.jar Sample /tmp/input tmp/output

mapreduce.jar は私がパッケージ化した jar で、入力ファイルは次のようなものです

1 a,b,c
2 e,f
1 x,y,z
2 g

期待される出力

1 a|b|c|x|y|z
2 e|f|g
4

2 に答える 2

0

KeyValueTextInputFormatを入力形式として使用しているため、個別のバイトが見つからないため、行全体の値をキーとして使用していると思います(値は "")。これは、マッパーでの反復がループを通過せず、何も書き出されないことを意味します。構成でプロパティ名mapreduce.input.keyvaluelinerecordreader.key.value.separatorを使用して、区切りバイトとして「」を保持します。

于 2012-10-08T13:34:23.060 に答える
0

構成オブジェクトを JobConf に渡してみてください。 JobConf は Hadoop/hdfs 構成を取得できないと思います。

Configuration configuration=new Configuration();
JobConf jobconf=new JobConf(configuration, exampleClass.class);
conf2.setJarByClass(cls);
.......
于 2012-10-08T11:58:13.787 に答える