2

Hadoop 0.20.2 (変更不可) を使用しており、入力パスにフィルターを追加したいと考えています。データは次のようになります。

/path1/test_a1
/path1/test_a2
/path1/train_a1
/path1/train_a2

そして、 trainを含むすべてのファイルを処理したいだけです。

FileInputFormat クラスを見ると、次の使用が提案されています。

 FileInputFormat.setInputPathFilter(Job job, Class<? extends PathFilter> filter)

PathFilterはインターフェイスであるため、これが私の問題の始まりです。もちろん、インターフェイスを拡張できますが、まだ実装がありません。その代わりに、私はインターフェースを実装しました:

class TrainFilter implements PathFilter
{
   boolean accept(Path path)
   {
      return path.toString().contains("train");
   }
}

TrainFilter を PathFilter として使用すると、コードはコンパイルされますが、実行すると、入力パスが台無しになるため例外が発生します。フィルターを設定しないと、コードは /path1 の下にあるすべてのファイルを実行しますが、フィルターを設定すると、次のエラーがスローされます。

InvalidInputException: Input path does not exist hdfs://localhost:9000/path1

ドライバーコードで設定する方法は次のとおりです。

job.setMapperClass(....class);
job.setInputFormatClass(....class);
job.setMapOutputKeyClass(...class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.setInputPathFilter(job, TrainFilter.class);
FileInputFormat.addInputPath(job, new Path("/path1/"));
FileOutputFormat.setOutputPath(job, new Path("/path2/"));
job.waitForCompletion(true);

ここで私が間違っていることの提案はありますか?

編集:問題が見つかりました。PathFilter への最初の呼び出しは常にディレクトリ自体 (/path1) であり、("train") が含まれていないため、ディレクトリ自体が無効であるため、例外がスローされます。任意のパスがディレクトリかどうかをテストするにはどうすればよいですか? 私が知っている限りでは、FileSystem への参照が必要ですが、これは PathFilter のデフォルト パラメータの 1 つではありません。

4

4 に答える 4

6

または、指定されたディレクトリ内のすべてのファイルをループして、ファイル名がtrainで始まるかどうかを確認することもできます。例えば:

        Job job = new Job(conf, "myJob");
        List<Path> inputhPaths = new ArrayList<Path>();

        String basePath = "/user/hadoop/path";
        FileSystem fs = FileSystem.get(conf);
        FileStatus[] listStatus = fs.globStatus(new Path(basePath + "/train*"));
        for (FileStatus fstat : listStatus) {
            inputhPaths.add(fstat.getPath());
        }

        FileInputFormat.setInputPaths(job,
                (Path[]) inputhPaths.toArray(new Path[inputhPaths.size()]));
于 2012-11-19T12:57:49.617 に答える
2

簡単な修正、パスに「test」が含まれている場合にfalseを返すなど、ホワイトリストの代わりにパスをブラックリストに登録できます

于 2014-05-06T19:27:57.943 に答える
1

Filter に Configurable インターフェイスを実装する (または Configured クラスを拡張する) ことで FileSystem インスタンスを取得し、setConf メソッドで fileSystem インスタンス変数を作成できます。

class TrainFilter extends Configured implements PathFilter
{
   FileSystem fileSystem;

   boolean accept(Path path)
   {
      // TODO: use fileSystem here to determine if path is a directory
      return path.toString().contains("train");
   }

   public void setConf(Configuration conf) {
     if (conf != null) {
       fileSystem = FileSystem.get(conf);
     }
   }
}
于 2012-11-27T01:32:29.843 に答える