Hadoop ジョブを毎日実行するときに、既存の出力ディレクトリを上書き/再利用したいと考えています。実際には、出力ディレクトリには、毎日のジョブ実行結果の要約出力が格納されます。同じ出力ディレクトリを指定すると、「出力ディレクトリは既に存在します」というエラーが表示されます。
この検証をバイパスする方法は?
Hadoop ジョブを毎日実行するときに、既存の出力ディレクトリを上書き/再利用したいと考えています。実際には、出力ディレクトリには、毎日のジョブ実行結果の要約出力が格納されます。同じ出力ディレクトリを指定すると、「出力ディレクトリは既に存在します」というエラーが表示されます。
この検証をバイパスする方法は?
ジョブを実行する前にディレクトリを削除するのはどうですか?
シェル経由でこれを行うことができます:
hadoop fs -rmr /path/to/your/output/
または Java API 経由:
// configuration should contain reference to your namenode
FileSystem fs = FileSystem.get(new Configuration());
// true stands for recursively deleting the folder you gave
fs.delete(new Path("/path/to/your/output"), true);
Jungblut の答えは、あなたの直接的な解決策です。私は自動化されたプロセスが物を削除することを決して信用していないので(個人的には)、別の方法を提案します:
上書きしようとするのではなく、ジョブの出力名を実行時間を含めて動的にすることをお勧めします。
「 」のようなもの/path/to/your/output-2011-10-09-23-04/
。このようにして、再度アクセスする必要がある場合に備えて、古いジョブの出力を保持できます。毎日 10 件以上のジョブを実行する私のシステムでは、出力を次のように構成し/output/job1/2011/10/09/job1out/part-r-xxxxx
ます/output/job1/2011/10/10/job1out/part-r-xxxxx
。
Hadoop TextInputFormat
(あなたが使用していると思います) では、既存のディレクトリを上書きすることはできません。おそらく、あなた (およびあなたのクラスター) が懸命に取り組んできたものを誤って削除してしまったことに気付いたときの苦痛を言い訳するためでしょう。
ただし、出力フォルダーをジョブによって上書きすることが確実な場合は、次のTextOutputFormat
ように少し変更するのが最もクリーンな方法だと思います。
public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V>
{
public RecordWriter<K, V>
getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException
{
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator= conf.get("mapred.textoutputformat.separator","\t");
CompressionCodec codec = null;
String extension = "";
if (isCompressed)
{
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, true);
if (!isCompressed)
{
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
}
else
{
return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);
}
}
}
ここで、overwrite=true でFSDataOutputStream
( ) を作成しています。fs.create(file, true)
時間ごとに実行ごとに出力サブディレクトリを作成できます。たとえば、ユーザーからの出力ディレクトリを期待していて、次のように設定するとします。
FileOutputFormat.setOutputPath(job, new Path(args[1]);
これを次の行で変更します。
String timeStamp = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss", Locale.US).format(new Timestamp(System.currentTimeMillis()));
FileOutputFormat.setOutputPath(job, new Path(args[1] + "/" + timeStamp));
メインクラスに設定を追加する必要があります:
//Configuring the output path from the filesystem into the job
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//auto_delete output dir
OutputPath.getFileSystem(conf).delete(OutputPath);