1

現在、2 番目のジョブが最初のジョブの出力を分散キャッシュに追加する必要がある 2 つの Hadoop ジョブがあります。現在、手動で実行しているため、最初のジョブが終了したら、出力ファイルを引数として 2 番目のジョブに渡し、そのドライバーがそれをキャッシュに追加します。

最初のジョブは単純なマップのみのジョブであり、両方のジョブを順番に実行したときに 1 つのコマンドを実行できることを望んでいました。

最初のジョブの出力を分散キャッシュに入れて、2 番目のジョブに渡すことができるようにするためのコードを手伝ってくれる人はいますか?

ありがとう

編集:これはジョブ1の現在のドライバーです:

public class PlaceDriver {

public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
        System.err.println("Usage: PlaceMapper <in> <out>");
        System.exit(2);
    }
    Job job = new Job(conf, "Place Mapper");
    job.setJarByClass(PlaceDriver.class);
    job.setMapperClass(PlaceMapper.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    TextInputFormat.addInputPath(job, new Path(otherArgs[0]));
    TextOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

これは job2 のドライバーです。ジョブ 1 の出力は、最初の引数としてジョブ 2 に渡され、キャッシュにロードされます。

public class LocalityDriver {

public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 3) {
        System.err.println("Usage: LocalityDriver <cache> <in> <out>");
        System.exit(2);
    }
    Job job = new Job(conf, "Job Name Here");
    DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(),job.getConfiguration());
    job.setNumReduceTasks(1); //TODO: Will change
    job.setJarByClass(LocalityDriver.class);
    job.setMapperClass(LocalityMapper.class);
    job.setCombinerClass(TopReducer.class);
    job.setReducerClass(TopReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    TextInputFormat.addInputPath(job, new Path(otherArgs[1]));
    TextOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
4

4 に答える 4

1

同じメインに 2 つのジョブ オブジェクトを作成します。他のものを実行する前に、最初のものが完了するのを待ちます。

public class DefaultTest extends Configured implements Tool{


    public int run(String[] args) throws Exception {

        Job job = new Job();

        job.setJobName("DefaultTest-blockx15");

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setNumReduceTasks(15);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setJarByClass(DefaultTest.class);

        job.waitForCompletion(true):

                job2 = new Job(); 

                // define your second job with the input path defined as the output of the previous job.


        return 0;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        ToolRunner.run(new DefaultTest(), otherArgs);
    }
 }
于 2012-04-30T01:42:31.977 に答える
0

MapReduce でのジョブ チェーンは、かなり一般的なシナリオです。オープンソースの MapReduce ワークフロー管理ソフトウェアであるcascadingを試すことができます。そして、ここで進行中のカスケードについていくつかの議論があります。または、ここで同様のディスカッションを確認できます。

于 2012-04-26T02:11:39.673 に答える
0

簡単な答えは、両方のメイン メソッドのコードを 2 つの別々のメソッドに抽出することです。たとえば、次のようにメイン メソッドでそれらboolean job1()boolean job2() 呼び出します。

public static void main(String[] args) throws Exception {
   if (job1()) {
      jobs2();
   }
}

ここで、 と の呼び出しの戻り値はjob1job2呼び出しの結果ですjob.waitForCompletion(true)

于 2012-04-25T08:49:18.993 に答える