2

あるステップの出力が次のステップの入力に供給されるように、複数の Hadoop ジョブをチェーンする方法を見つけようとしています。グーグルで見つけた多くのことは、単一のスレッドから一度に1つずつ呼び出して完了を待つか、 Job.addDependingJob() を使用してから送信する必要があることを示しています。後者を選択しましたが、前のジョブが終了した後に後続のジョブを実行できないようです。

これが私のコードです:

List<Job> jobs = new ArrayList<Job>();

for(int i = 0; i < stepCount; i++) {
    JobConf jc = new JobConf(clusterConfig);

    ... set up mappers and reducers here ...

    ... set up input and output paths here ...

    Job j = new Job(jc);
    j.addDependingJob(jobs.get(jobs.size() - 1);
    jobs.add(j);
}

for(Job j : Jobs) {
    JobClient client = new JobClient();
    client.init(j.getJobConf());
    client.submit(j.getJobConf());
}

すべてのジョブが一度に実行され、次のような出力が得られます。

  • ジョブ jar ファイルが設定されていません。ユーザー クラスが見つからない可能性があります。JobConf(Class) または JobConf#setJar(String) を参照してください。
  • 処理する合計入力パス: 1
  • 引数の解析には GenericOptionsParser を使用します。アプリケーションは、同じツールを実装する必要があります。
  • ジョブ jar ファイルが設定されていません。ユーザー クラスが見つからない可能性があります。JobConf(Class) または JobConf#setJar(String) を参照してください。
  • プロセスへの合計入力パス: 0
  • 引数の解析には GenericOptionsParser を使用します。アプリケーションは、同じツールを実装する必要があります。
  • ジョブ jar ファイルが設定されていません。ユーザー クラスが見つからない可能性があります。JobConf(Class) または JobConf#setJar(String) を参照してください。
  • プロセスへの合計入力パス: 0
  • 引数の解析には GenericOptionsParser を使用します。アプリケーションは、同じツールを実装する必要があります。
  • ジョブ jar ファイルが設定されていません。ユーザー クラスが見つからない可能性があります。JobConf(Class) または JobConf#setJar(String) を参照してください。
  • プロセスへの合計入力パス: 0

私は何を間違っていますか?

注: Hadoop 0.20.205 を使用しています

明確にするために編集:ジョブチェーンをクラスターに送信し、ジョブチェーンが完了するのを待たずにすぐに戻ることができる必要があります。

4

3 に答える 3

2

ジョブ間の依存関係を設定するには、JobControl を使用する必要があります。指定されたコードでは依存関係が設定されていないため、ジョブは順番に実行されるのではなく、並行して実行されます。より複雑なワークフローがある場合は、Oozie を使用できます。

これについては、興味深い記事があります

于 2013-03-21T16:20:39.690 に答える
1

これに対処してから数年が経ちましたが、いくつかのことがわかります。

  1. あなたのエラーは、ジョブ間の連鎖とは何の関係もありません。連鎖することを心配する前に、単一のジョブを実行できることを確認してください。
  2. Jobcontrol は一連のジョブをジョブ トラッカーに送信しません (少なくとも 2010 年には送信しませんでした)。上流のジョブがいつ完了したかを確認し、次のジョブを自動的にジョブ トラッカーに送信するツールです。実行して終了することはできません。
  3. ジョブで submit を呼び出すべきではありません。それはそれらを実行するように提出します。そこのどこかでジョブ制御に制御を渡す必要があります。

これは紛らわしいと思い、https://github.com/kevinpet/jobcontrolに独自の DAG ヘルパーを書き始めました。役に立つかもしれないし、役に立たないかもしれません。

于 2013-03-22T06:49:57.127 に答える
-1

以下は map reduce ジョブをチェーンする方法です。ここでは、最初のジョブの出力に対して 2 番目のジョブを実行しています。

        Job job1 = new Job(conf, "job1");
    Job job2 = new Job(conf,"job2");
    job1.setJarByClass(driver.class);
    job1.setMapperClass(Map.class);
    job1.setCombinerClass(Reduce.class);
    job1.setReducerClass(Reduce.class);
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(IntWritable.class);
    String outputpath="/user/hadoop/firstjoboutput";
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(outputpath));
    job1.waitForCompletion(true);

    job2.setJarByClass(driver.class);
    job2.setMapperClass(SecondMap.class);
    job2.setReducerClass(SecondReducer.class);
    job2.setMapOutputKeyClass(IntWritable.class);
    job2.setMapOutputValueClass(Text.class);
    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job2, new Path(outputpath));
    String finaloutput="/user/hadoop/finaloutput";
    FileOutputFormat.setOutputPath(job2, new Path(finaloutput));


    System.exit(job2.waitForCompletion(true) ? 0 : 1);
于 2013-03-22T06:04:40.687 に答える