私のMapReduce
仕事は日付ごとにデータを処理し、出力を特定のフォルダー構造に書き込む必要があります。現在の期待は、次の構造でアウトプットを生成することです。
2013
01
02
..
2012
01
02
..
等
いつでも最大 12 か月のデータしか取得できないため、MultipleOutputs
クラスを使用して、ドライバーで次の関数を使用して 12 の出力を作成しています。
public void createOutputs(){
Calendar c = Calendar.getInstance();
String monthStr, pathStr;
// Create multiple outputs for last 12 months
// TODO make 12 configurable
for(int i = 0; i < 12; ++i ){
//Get month and add 1 as month is 0 based index
int month = c.get(Calendar.MONTH)+1;
//Add leading 0
monthStr = month > 10 ? "" + month : "0" + month ;
// Generate path string in the format 2013/03/etl
pathStr = c.get(Calendar.YEAR) + "" + monthStr + "etl";
// Add the named output
MultipleOutputs.addNamedOutput(config, pathStr );
// Move to previous month
c.add(Calendar.MONTH, -1);
}
}
レデューサーでは、生成された出力を適切なディレクトリに移動するためのクリーンアップ機能を追加しました。
protected void cleanup(Context context) throws IOException, InterruptedException {
// Custom function to recursively process data
moveFiles (FileSystem.get(new Configuration()), new Path("/MyOutputPath"));
}
問題: 出力が _temporary ディレクトリから出力ディレクトリに移動される前に、リデューサーのクリーンアップ機能が実行されます。このため、すべてのデータがまだ _temporary ディレクトリにあるため、上記の関数は実行時に出力を認識しません。
目的の機能を実現するための最良の方法は何ですか? 洞察に感謝します。
次のことを考えます。
- カスタム outputcommitter を使用する方法はありますか?
- 別のジョブをチェーンする方が良いですか、それともやり過ぎですか?
- 私が気付いていないより簡単な代替手段はありますか..
cleanup
関数からのファイル構造のサンプル ログを次に示します。
MyMapReduce: filepath:hdfs://localhost:8020/dev/test
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_logs
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_logs/history/job_201310301015_0224_1383763613843_371979_HtmlEtl
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0/201307etl-r-00000
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0/part-r-00000