Hadoop 0.20.2 (変更不可) を使用しており、入力パスにフィルターを追加したいと考えています。データは次のようになります。
/path1/test_a1
/path1/test_a2
/path1/train_a1
/path1/train_a2
そして、 trainを含むすべてのファイルを処理したいだけです。
FileInputFormat クラスを見ると、次の使用が提案されています。
FileInputFormat.setInputPathFilter(Job job, Class<? extends PathFilter> filter)
PathFilterはインターフェイスであるため、これが私の問題の始まりです。もちろん、インターフェイスを拡張できますが、まだ実装がありません。その代わりに、私はインターフェースを実装しました:
class TrainFilter implements PathFilter
{
boolean accept(Path path)
{
return path.toString().contains("train");
}
}
TrainFilter を PathFilter として使用すると、コードはコンパイルされますが、実行すると、入力パスが台無しになるため例外が発生します。フィルターを設定しないと、コードは /path1 の下にあるすべてのファイルを実行しますが、フィルターを設定すると、次のエラーがスローされます。
InvalidInputException: Input path does not exist hdfs://localhost:9000/path1
ドライバーコードで設定する方法は次のとおりです。
job.setMapperClass(....class);
job.setInputFormatClass(....class);
job.setMapOutputKeyClass(...class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPathFilter(job, TrainFilter.class);
FileInputFormat.addInputPath(job, new Path("/path1/"));
FileOutputFormat.setOutputPath(job, new Path("/path2/"));
job.waitForCompletion(true);
ここで私が間違っていることの提案はありますか?
編集:問題が見つかりました。PathFilter への最初の呼び出しは常にディレクトリ自体 (/path1) であり、("train") が含まれていないため、ディレクトリ自体が無効であるため、例外がスローされます。任意のパスがディレクトリかどうかをテストするにはどうすればよいですか? 私が知っている限りでは、FileSystem への参照が必要ですが、これは PathFilter のデフォルト パラメータの 1 つではありません。