0

AmazonEMRがカスタムInputFileFormatを受け入れるのに少し問題があります。

public class Main extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new JobConf(), new Main(), args);
        System.exit(res);
    }


    public int run(String[] args) throws Exception {

        Path inputPath = new Path(args[0]);
        Path outputPath = new Path(args[1]);

        System.out.println("Input  path: "+inputPath+"\n");
        System.out.println("Output path: "+outputPath+"\n");

        Configuration conf = getConf();
        Job job = new Job(conf, "ProcessDocs");

        job.setJarByClass(Main.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setInputFormatClass(XmlInputFormat.class);

        TextInputFormat.setInputPaths(job, inputPath);
        TextOutputFormat.setOutputPath(job, outputPath);

        job.waitForCompletion(true);

        return 0;
    }   
}

ログファイルを見る:

2012-06-04 23:35:20,053 INFO org.apache.hadoop.mapred.JobClient (main): Default number of map tasks: null
2012-06-04 23:35:20,054 INFO org.apache.hadoop.mapred.JobClient (main): Setting default number of map tasks based on cluster size to : 6
2012-06-04 23:35:20,054 INFO org.apache.hadoop.mapred.JobClient (main): Default number of reduce tasks: 1
2012-06-04 23:35:20,767 INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat (main): Total input paths to process : 1
2012-06-04 23:35:20,813 INFO com.hadoop.compression.lzo.GPLNativeCodeLoader (main): Loaded native gpl library
2012-06-04 23:35:20,886 WARN com.hadoop.compression.lzo.LzoCodec (main): Could not find build properties file with revision hash
2012-06-04 23:35:20,886 INFO com.hadoop.compression.lzo.LzoCodec (main): Successfully loaded & initialized native-lzo library [hadoop-lzo rev UNKNOWN]
2012-06-04 23:35:20,906 WARN org.apache.hadoop.io.compress.snappy.LoadSnappy (main): Snappy native library is available
2012-06-04 23:35:20,906 INFO org.apache.hadoop.io.compress.snappy.LoadSnappy (main): Snappy native library loaded
2012-06-04 23:35:22,240 INFO org.apache.hadoop.mapred.JobClient (main): Running job: job_201206042333_0001

EMRのHadoopがデフォルトのリーダーを想定しているInputFileFormatようです...何が間違っているのですか?

注:の可用性に関してHadoopからエラーが発生することはありませんXmlInputClass。*注2:*ファイルを取得<property><name>mapreduce.inputformat.class</name><value>com.xyz.XmlInputFormat</value></property>jobs/some_job_id.conf.xmlます。

アップデート:

public class XmlInputFormat extends TextInputFormat {

  public static final String START_TAG_KEY = "xmlinput.start";
  public static final String END_TAG_KEY = "xmlinput.end";

  public RecordReader<LongWritable,Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {

      System.out.println("Creating a new 'XmlRecordReader'");

      return new XmlRecordReader((FileSplit) split, context.getJobConf());
  }

  /*
  @Override
  public RecordReader<LongWritable,Text> getRecordReader(InputSplit inputSplit,
                                                         JobConf jobConf,
                                                         Reporter reporter) throws IOException {
    return new XmlRecordReader((FileSplit) inputSplit, jobConf);
  }
  */

  /**
   * XMLRecordReader class to read through a given xml document to output xml
   * blocks as records as specified by the start tag and end tag
   * 
   */
  public static class XmlRecordReader implements RecordReader<LongWritable,Text> {
    private final byte[] startTag;
    private final byte[] endTag;
    private final long start;
    private final long end;
    private final FSDataInputStream fsin;
    private final DataOutputBuffer buffer = new DataOutputBuffer();

    public XmlRecordReader(FileSplit split, JobConf jobConf) throws IOException {
      startTag = jobConf.get(START_TAG_KEY).getBytes("utf-8");
      endTag = jobConf.get(END_TAG_KEY).getBytes("utf-8");

      System.out.println("XmlInputFormat: Start Tag: "+startTag);
      System.out.println("XmlInputFormat: End Tag  : "+endTag);

      // open the file and seek to the start of the split
      start = split.getStart();
      end = start + split.getLength();
      Path file = split.getPath();
      FileSystem fs = file.getFileSystem(jobConf);
      fsin = fs.open(split.getPath());
      fsin.seek(start);
    }
    ...
4

2 に答える 2

0

XmlInputFormatがを含む同じjarの一部でない場合はmain()、メインjarの「lib」と呼ばれる「サブフォルダー」にビルドするか、XmlInputFormatS3からを含む追加のjarをコピーするブートストラップアクションを作成する必要があります。 /home/hadoop/libEMRのデフォルトでHadoopクラスパスの一部であるマジックフォルダ。

確かに、抽象であるFileInputFormatを想定していません。

あなたの編集に基づいて、私はあなたの質問の前提が間違っていると思います。入力フォーマットが実際に見つかり、使用されたのではないかと思います。タスク試行からのSystem.out.printlnは、stdoutダイジェストに表示される場合がありますが、ジョブのsyslogには含まれません。

于 2012-06-05T00:50:30.397 に答える
0

これは、このカスタムjarファイルをEMRまたはHadoopで実行するために私が見つけたもう1つの簡単な方法です http://www.applams.com/2014/05/using-custom-streaming-jar-using-custom.html

于 2014-05-20T02:26:17.443 に答える