1

AWS EMR で実行されている既存の map reduce ジョブがあります。これは、数十億行のログを処理し、いくつかの計算を行ってマッパーから (キー、値) ペアを形成します。これらの計算は非常に時間がかかるため、これらの計算の中間ステップの出力を他のマップ削減ジョブで使用する必要があります。したがって、既存のジョブに影響を与えずに (つまり、現在のマッパーまたはリデューサーを変更せずに)、計算の出力をタップして s3 にアップロードしたいと考えています。アップロードする前に、まずこれらの行をローカルの一時ファイルに集め、ファイルが十分に大きくなったら、このファイルを s3 にアップロードします。

問題は - レデューサーとは異なり、マッパーはキーに基づいてデータをソートできません。ファイル名の競合がないように、異なるマッパーからデータをアップロードするための s3 の一意のファイル名をどのように工夫すればよいですか?

私はJavaを使用しています。マッパークラスターIDを取得するか、ランダム番号を生成する方法があれば、問題も解決できます。すべてのマッパーに固有のもの (やり方がわからない?)

4

1 に答える 1

0

現在実行中のHadoopタスクのタスク試行IDを取得できます。これは、すべてのマッパーで一意であるため、ファイル名として使用できます。次の方法で試行IDを取得します。

public static String getAttemptId(Configuration conf) throws IllegalArgumentException
   {
       if (conf == null) {
           throw new NullPointerException("conf is null");
       }

       String taskId = conf.get("mapred.task.id");
       if (taskId == null) {
           throw new IllegalArgumentException("Configutaion does not contain the property mapred.task.id");
       }

       String[] parts = taskId.split("_");
       if (parts.length != 6 ||
               !parts[0].equals("attempt") ||
               (!"m".equals(parts[3]) && !"r".equals(parts[3]))) {
           throw new IllegalArgumentException("TaskAttemptId string : " + taskId + " is not properly formed");
       }

       return parts[4] + "-" + parts[5];
   }
于 2012-12-26T23:51:14.213 に答える