1

いくつかのジョブを Hadoop に送信する必要があります。これらのジョブはすべて関連しています (同じドライバー クラスによって起動されるのはそのためです) が、互いに完全に独立しています。今、私は次のような仕事を始めます:

int res = ToolRunner.run(new Configuration(), new MapReduceClass(params), args);

ジョブを実行し、リターン コードを取得して、先に進みます。

私がやりたいのは、そのようなジョブをいくつかサブミットして並行して実行し、それぞれのリターン コードを取得することです。

(私にとって) 明らかなアイデアは、それぞれが単一の Hadoop ジョブを担当する複数のスレッドを起動することですが、hadoop にこれを達成するためのより良い方法があるかどうか疑問に思っています。私は並行性を伴うコードを書いた経験がないので、ここで必要でない限り、その複雑さを学ぶことに多くの時間を費やしたくありません。

4

2 に答える 2

3

これは提案かもしれませんが、コードを暗示しているので、私はそれを答えとして置きます。

このコード(個人コード)では、変数を繰り返し処理し、ジョブ(同じジョブ)を数回送信します。

job.waitForCompletion(false)を使用すると、いくつかのジョブを送信するのに役立ちます。

while (processedInputPaths < inputPaths.length) {

    if (processedInputPaths + inputPathsLimit < inputPaths.length) {
        end = processedInputPaths + inputPathsLimit - 1;
    } else {
        end = inputPaths.length - 1;
    }
    start = processedInputPaths;

    Job job = this.createJob(configuration, inputPaths, cycle, start, end, outputPath + "/" + cycle);

    boolean success = job.waitForCompletion(true);

    if (success) {
        cycle++;
        processedInputPaths = end + 1;
    } else {
        LOG.info("Cycle did not end successfully :" + cycle);
        return -1;
    }

}
于 2013-03-01T14:06:25.947 に答える
1

psabbate の回答により、欠けていた API のいくつかの部分を見つけることができました。これが私がそれを解決した方法です:

ドライバー クラスで、次のようなコードでジョブを開始します。

List<RunningJob> runningJobs = new ArrayList<RunningJob>();
for (String jobSpec: jobSpecs) {
    // Configure, for example, a params map that gets passed into the MR class's constructor
    ToolRunner.run(new Configuration(), new MapReduceClass(params, runningJobs), null);
}

for (RunningJob rj: runningJobs) {
    System.err.println("Waiting on job "+rj.getID());
    rj.waitForCompletion();
}

次に、MapReduceClass でプライベート変数List<RunningJob> runningJobsを定義し、コンストラクターを次のように定義します。

public MergeAndScore(Map<String, String> p, List<RunningJob> rj) throws IOException {
    params = Collections.unmodifiableMap(p);
    runningJobs = rj;
}

そして、呼び出すrun()メソッドで、あなたを定義し、ジョブを送信しますToolRunnerJobConf

JobClient jc = new JobClient();
jc.init(conf);
jc.setConf(conf);
runningJobs.add(jc.submitJob(conf));

これにより、すぐに戻り、ドライバークラスrun()のオブジェクトを介してジョブにアクセスできます。runningJobs

私は古いバージョンの Hadoop で作業しているため、セットアップによってはjc.init(conf)and/orjc.setConf(conf)が必要な場合とそうでない場合がありますが、おそらく少なくとも 1 つが必要であることに注意してください。

于 2013-03-01T14:39:26.253 に答える