0

次のコードを使用して、hadoop で 2 つの連続した M/R ジョブをチェーンしようとしています。基本的に、最初のジョブが完了した直後に、最初のジョブの出力を入力として使用する別のジョブを作成します。しかし、コードは 2 番目のジョブの出力を生成せず、例外もスローしませんでした。どこが間違っている可能性があるかを教えてもらえますか?それは有り難いです。

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args)
            .getRemainingArgs();
    if (otherArgs.length != 3) {
        System.err.println("Usage: jobStats <in> <out> <job>");
        System.exit(2);
    }


    conf.set("job", otherArgs[2]);
    Job job = new Job(conf, "job count");
    job.setJarByClass(jobStats.class);
    job.setMapperClass(jobMapper.class);
    job.setCombinerClass(jobReducer.class);
    job.setReducerClass(jobReducer.class);

    job.setMapOutputKeyClass(Text.class);        
    job.setMapOutputValueClass(IntWritable.class);           
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

    boolean completionStatus1 = job.waitForCompletion(true);
    if (completionStatus1 == true)
    {
        Job job2 = new Job(conf, "job year ranking");
        job2.setJarByClass(jobStats.class);
        job2.setPartitionerClass(ChainedPartitioner.class);
        job2.setGroupingComparatorClass(CompKeyGroupingComparator.class);
        job2.setSortComparatorClass(CompKeyComparator.class);

        job2.setMapperClass(ChainedMapper.class);
        job2.setReducerClass(ChainedReducer.class);
        job2.setPartitionerClass(ChainedPartitioner.class);
        job2.setMapOutputKeyClass(CompositeKey.class);
        job2.setMapOutputValueClass(IntWritable.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(IntWritable.class);

        Path outPath = new Path(otherArgs[1] + "part-r-00000"); // this is the hard-coded output of first job
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(outPath))
        {
            FileInputFormat.addInputPath(job2, outPath);
            FileOutputFormat.setOutputPath(job2, new Path("/user/tony/output/today"));

            boolean completionStatus2 = job2.waitForCompletion(true);
            if (completionStatus2 == true)
            {
                fs.delete(outPath, true);
                System.exit(0);
            }
            else System.exit(1);
        }
        else System.exit(1);
    }
}
4

1 に答える 1

0

ChainedMapper および ChainedReducer クラスは、複数のマッパーを 1 つの Map Reduce ジョブにまとめるために使用されます。M1-M2-M3-R-M4-M5 のようなもの。

あなたのケースでは、2 つの完全な map reduce ジョブを続けて実行したいと考えています。2 番目のジョブに実際のマップを指定するだけです。

于 2013-10-29T15:08:46.217 に答える