3

私のMapReduce仕事は日付ごとにデータを処理し、出力を特定のフォルダー構造に書き込む必要があります。現在の期待は、次の構造でアウトプットを生成することです。

2013
    01
    02
    ..

2012
    01
    02
    ..

いつでも最大 12 か月のデータしか取得できないため、MultipleOutputsクラスを使用して、ドライバーで次の関数を使用して 12 の出力を作成しています。

public void createOutputs(){
    Calendar c = Calendar.getInstance();
    String monthStr, pathStr;

    // Create multiple outputs for last 12 months
    // TODO make 12 configurable
    for(int i = 0; i < 12; ++i ){
        //Get month and add 1 as month is 0 based index
        int month = c.get(Calendar.MONTH)+1; 
        //Add leading 0
        monthStr = month > 10 ? "" + month : "0" + month ;  
        // Generate path string in the format 2013/03/etl
        pathStr = c.get(Calendar.YEAR) + "" + monthStr + "etl";
        // Add the named output
        MultipleOutputs.addNamedOutput(config, pathStr );  
        // Move to previous month
        c.add(Calendar.MONTH, -1); 
    }
}

レデューサーでは、生成された出力を適切なディレクトリに移動するためのクリーンアップ機能を追加しました。

protected void cleanup(Context context) throws IOException, InterruptedException {
        // Custom function to recursively process data
        moveFiles (FileSystem.get(new Configuration()), new Path("/MyOutputPath"));
}

問題: 出力が _temporary ディレクトリから出力ディレクトリに移動される前に、リデューサーのクリーンアップ機能が実行されます。このため、すべてのデータがまだ _temporary ディレクトリにあるため、上記の関数は実行時に出力を認識しません。

目的の機能を実現するための最良の方法は何ですか? 洞察に感謝します。

次のことを考えます。

  • カスタム outputcommitter を使用する方法はありますか?
  • 別のジョブをチェーンする方が良いですか、それともやり過ぎですか?
  • 私が気付いていないより簡単な代替手段はありますか..

cleanup関数からのファイル構造のサンプル ログを次に示します。

MyMapReduce: filepath:hdfs://localhost:8020/dev/test
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_logs
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_logs/history/job_201310301015_0224_1383763613843_371979_HtmlEtl
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0/201307etl-r-00000
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0/part-r-00000
4

2 に答える 2

13

副業は必要ありません。現在、MultipleOutputs を使用して、プログラムの 1 つで大量の出力ディレクトリを作成しています。30 以上のディレクトリがあるにもかかわらず、使用できる MultipleOutputs オブジェクトは 2 つだけです。これは、書き込み時に出力ディレクトリを設定できるため、必要な場合にのみ決定できます。実際には複数の namedOutput が必要なのは、異なる形式で出力したい場合だけです (例: キー: Text.class、値: Text.class を持つものと、キー: Text.class および値: IntWritable.class を持つもの)。

設定:

MultipleOutputs.addNamedOutput(job, "Output", TextOutputFormat.class, Text.class, Text.class);

減速機のセットアップ:

mout = new MultipleOutputs<Text, Text>(context);

レデューサーで mout を呼び出す:

String key; //set to whatever output key will be
String value; //set to whatever output value will be
String outputFileName; //set to absolute path to file where this should write

mout.write("Output",new Text(key),new Text(value),outputFileName);

コーディング中にコードの一部でディレクトリを決定することができます。たとえば、月と年でディレクトリを指定するとします。

int year;//extract year from data
int month;//extract month from data
String baseFileName; //parent directory to all outputs from this job
String outputFileName = baseFileName + "/" + year + "/" + month;

mout.write("Output",new Text(key),new Text(value),outputFileName);

お役に立てれば。

編集: 上記の例の出力ファイル構造:

Base
    2013
        01
        02
        03
        ...
    2012
        01
        ...
    ...
于 2013-11-06T20:09:26.430 に答える
0

ほとんどの場合、クリーンアップで mos を閉じませんでした。

以下のようなマッパーまたはリデューサーの設定がある場合:

public void setup(Context context) {mos = new MultipleOutputs(context);}

以下のように、クリーンアップの開始時に mos を閉じる必要があります。

public void cleanup(Context context ) throws IOException, InterruptedException {mos.close();}
于 2015-06-07T17:21:23.723 に答える