0

これは単純なはずですが、Hadoop JobTracker のインスタンスを取得する方法がわかりません。コンストラクターを呼び出すことができないことに気付いた後、ジョブを送信した後、そのようにインスタンス化しようとしています。

  JobClient client = new JobClient(conf);
  RunningJob runningJob = client.submitJob(conf);    
  JobTracker jobTracker = JobTracker.startTracker(_conf);

jobTracker の行を削除すると、プログラムは問題なく実行されます。そのままにしておくと、その JobConf の JobTracker が既に存在するという例外が発生します。

JobTracker のインスタンスを取得する方法を知っている人はいますか? 現在、hadoop 1.2.1 を使用しています。

編集: JobTracker がすでにリッスンしているポートで JobTracker が開始しようとしているために、例外が発生していると考えています。テスト実行用に別のポートで JobTracker を開始することもできますが、Hadoop クラスター用に 2 つの JobTracker を使用することになり、本来の実行方法ではありません。自分の JobTracker を開始する前に、既存の JobTracker を停止しようとする誘惑にかられます (これは完全にテスト環境であり、システムが他のジョブと共有されている場合、これは実行できません) が、これは間違った道をたどっているようです。

4

1 に答える 1

1

クライアント コードで JobTracker のインスタンスを作成しません。作成済みの JobClient には、実行中の JobTracker と対話するために使用できるすべてのメソッドがあります (submitJob.

編集

残念ながら、成功したタスクのホスト名をクライアント API 経由で取得することはできません。残念なことに、コマンド ライン経由でこれを行うことはできますが、プライベート API 呼び出しを使用するため、このコマンドを呼び出して、次に、stdout を解析します (また、タスクのセットアップ イベントとクリーンアップ イベントも取得します)。

user@host1:~$ /opt/hadoop/default/bin/hadoop job -events job_201311110747_0001 0 100
Task completion events for job_201311110747_0001
Number of events (from 0) are: 4
SUCCEEDED attempt_201311110747_0001_m_000002_0 http://host1:50060/tasklog?plaintext=true&attemptid=attempt_201311110747_0001_m_000002_0
SUCCEEDED attempt_201311110747_0001_m_000000_0 http://host2:50060/tasklog?plaintext=true&attemptid=attempt_201311110747_0001_m_000000_0
SUCCEEDED attempt_201311110747_0001_r_000000_0 http://host3:50060/tasklog?plaintext=true&attemptid=attempt_201311110747_0001_r_000000_0
SUCCEEDED attempt_201311110747_0001_m_000001_0 http://host4:50060/tasklog?plaintext=true&attemptid=attempt_201311110747_0001_m_000001_0 

オプションとして、リフレクション ベースのハッカーを使用してプライベート API を公開し、必要に応じて使用することもできます。参考までに、コードで上記を複製するために必要な API 呼び出しを次に示します (これは、異なるバージョンとの前方互換性または後方互換性がない場合があります)。 Hadoop の - 1.2.1 の場合):

public class JobClientDriver extends Configured implements Tool {
    public static void main(String args[]) throws Exception {
        ToolRunner.run(new JobClientDriver(), args);
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        JobClient client = new JobClient(new JobConf(conf));

        Method method = JobClient.class.getDeclaredMethod("createRPCProxy", InetSocketAddress.class,
                Configuration.class);
        method.setAccessible(true);

        Object rpcClientSubProtocol = method.invoke(client, JobTracker.getAddress(conf), conf);

        Method completeEventsMethod = rpcClientSubProtocol.getClass().getDeclaredMethod("getTaskCompletionEvents",
                JobID.class, int.class, int.class);

        for (Object tceObj : ((Object[]) completeEventsMethod.invoke(rpcClientSubProtocol,
            JobID.forName("job_201311110747_0001"), 0, 100))) {
            TaskCompletionEvent tce = (TaskCompletionEvent) tceObj;
            if (tce.isMapTask()) {
                URI uri = new URI(tce.getTaskTrackerHttp());
                System.err.println(tce.getTaskAttemptId() + " @ " + uri.getHost());
            }
        }

        return 0;
    }
}
于 2013-11-10T00:15:30.320 に答える