12

最近、私はアマゾン ウェブ サービス (AWS) を使用していますが、この件に関するドキュメントがあまりないことに気付いたので、私のソリューションを追加しました。

Amazon Elastic MapReduce (Amazon EMR) を使用してアプリケーションを作成していました。計算が終了した後、作成されたファイルに対していくつかの作業を実行する必要があったため、ジョブ フローがいつ作業を完了したかを知る必要がありました。

これは、ジョブ フローが完了したかどうかを確認する方法です。

AmazonElasticMapReduce mapReduce = new AmazonElasticMapReduceClient(credentials);

DescribeJobFlowsRequest jobAttributes = new DescribeJobFlowsRequest()
    .withJobFlowStates("COMPLETED");

List<JobFlowDetail> jobs = mapReduce.describeJobFlows(jobAttributes).getJobFlows();
JobFlowDetail detail = jobs.get(0);

detail.getJobFlowId(); //the id of one of the completed jobs

特定のジョブ ID を検索して、DescribeJobFlowsRequestそのジョブが終了したか失敗したかを確認することもできます。

他の人に役立つことを願っています。

4

3 に答える 3

3

私もこの問題に遭遇しました。これが私が今思いついた解決策です。完璧ではありませんが、お役に立てば幸いです。参考までに、私は Java 1.7 と AWS Java SDK バージョン 1.9.13 を使用しています。

このコードは、厳密に言えば手順ではなく、クラスターが終了するのを待っていることを前提としていることに注意してください。すべてのステップが完了したときにクラスターが終了する場合、これは問題ありませんが、ステップの完了後も存続するクラスターを使用している場合、これはあまり役に立ちません。

また、このコードはクラスターの状態の変化を監視してログに記録し、さらにクラスターがエラーで終了したかどうかを診断し、終了した場合は例外をスローすることに注意してください。

private void yourMainMethod() {
    RunJobFlowRequest request = ...;

    try {
        RunJobFlowResult submission = emr.runJobFlow(request);
        String jobFlowId = submission.getJobFlowId();
        log.info("Submitted EMR job as job flow id {}", jobFlowId);

        DescribeClusterResult result = 
            waitForCompletion(emr, jobFlowId, 90, TimeUnit.SECONDS);
        diagnoseClusterResult(result, jobFlowId);
    } finally {
        emr.shutdown();
    }
}

private DescribeClusterResult waitForCompletion(
             AmazonElasticMapReduceClient emr, String jobFlowId,
             long sleepTime, TimeUnit timeUnit)
        throws InterruptedException {
    String state = "STARTING";
    while (true) {
        DescribeClusterResult result = emr.describeCluster(
                new DescribeClusterRequest().withClusterId(jobFlowId)
        );
        ClusterStatus status = result.getCluster().getStatus();
        String newState = status.getState();
        if (!state.equals(newState)) {
            log.info("Cluster id {} switched from {} to {}.  Reason: {}.",
                     jobFlowId, state, newState, status.getStateChangeReason());
            state = newState;
        }

        switch (state) {
            case "TERMINATED":
            case "TERMINATED_WITH_ERRORS":
            case "WAITING":
                return result;
        }

        timeUnit.sleep(sleepTime);
    }
}

private void diagnoseClusterResult(DescribeClusterResult result, String jobFlowId) {
    ClusterStatus status = result.getCluster().getStatus();
    ClusterStateChangeReason reason = status.getStateChangeReason();
    ClusterStateChangeReasonCode code = 
        ClusterStateChangeReasonCode.fromValue(reason.getCode());
    switch (code) {
    case ALL_STEPS_COMPLETED:
        log.info("Completed EMR job {}", jobFlowId);
        break;
    default:
        failEMR(jobFlowId, status);
    }
}

private static void failEMR(String jobFlowId, ClusterStatus status) {
    String msg = "EMR cluster run %s terminated with errors.  ClusterStatus = %s";
    throw new RuntimeException(String.format(msg, jobFlowId, status));
}
于 2014-12-20T00:46:40.023 に答える
1

ジョブ フローが完了すると、クラスターが停止し、HDFS パーティションが失われます。データの損失を防ぐために、Amazon S3 に結果を保存するようにジョブ フローの最後のステップを設定します。

JobFlowInstancesDetail:パラメーターが TRUE に設定されている場合KeepJobFlowAliveWhenNoSteps、ジョブ フローは、ステップが完了するとシャットダウンするのではなく、WAITING 状態に移行します。

各ジョブ フローでは、最大 256 のステップが許可されます。

仕事に時間がかかる場合は、結果を定期的に保存することをお勧めします。

簡単に言うと、いつ完了したかを知る方法はありません。代わりに、ジョブの一部としてデータを保存する必要があります。

于 2012-07-08T07:37:47.423 に答える
1

--wait-for-stepsジョブフロー作成時にオプションを使用します。

./elastic-mapreduce --create \
...
 --wait-for-steps \
...
于 2013-03-24T07:04:50.287 に答える