2

一部のウィキペディア ダンプ (圧縮された bz2 形式) に対して Java Mapper/Reducer を使用して Hadoop ストリーミング ジョブを実行しようとしています。ウィキメディアが最近リリースしたインターフェースであるWikiHadoopを使用しようとしています。

WikiReader_Mapper.java

package courseproj.example;

// Mapper: emits (token, 1) for every article occurrence.
public class WikiReader_Mapper extends MapReduceBase implements Mapper<Text, Text, Text, IntWritable> {

    // Reuse objects to save overhead of object creation.
    private final static Text KEY = new Text();
    private final static IntWritable VALUE = new IntWritable(1);

    @Override
    public void map(Text key, Text value, OutputCollector<Text, IntWritable> collector, Reporter reporter)
            throws IOException {
        KEY.set("article count");
        collector.collect(KEY, VALUE);
    }
}

WikiReader_Reducer.java

package courseproj.example;

//Reducer: sums up all the counts.
public class WikiReader_Reducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

    private final static IntWritable SUM = new IntWritable();

    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> collector,
            Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        SUM.set(sum);
        collector.collect(key, SUM);
    }
}

私が実行しているコマンドは

hadoop jar lib/hadoop-streaming-2.0.0-cdh4.2.0.jar \
       -libjars lib2/wikihadoop-0.2.jar \
       -D mapreduce.input.fileinputformat.split.minsize=300000000 \
       -D mapreduce.task.timeout=6000000 \
       -D org.wikimedia.wikihadoop.previousRevision=false \
       -input enwiki-latest-pages-articles10.xml-p000925001p001325000.bz2 \
       -output out \
       -inputformat org.wikimedia.wikihadoop.StreamWikiDumpInputFormat \
       -mapper WikiReader_Mapper \
       -reducer WikiReader_Reducer

そして、私が得ているエラーメッセージは

Error: java.lang.RuntimeException: Error in configuring object
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:72)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:130)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:424)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:157)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:152)

Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:103)

Caused by: java.io.IOException: Cannot run program "WikiReader_Mapper": java.io.IOException: error=2, No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:460)
    at org.apache.hadoop.streaming.PipeMapRed.configure(PipeMapRed.java:209)

古い Hadoop API よりも新しい Hadoop API の方がよくわかります。マッパーとレデューサーのコードは 2 つの異なるファイルにあるため、ジョブの JobConf 構成パラメーターを定義すると同時に、hadoop ストリーミングのコマンド構造に従います (マッパーとレデューサーのクラスを明示的に設定します)。マッパーとリデューサーのコードをすべて 1 つのクラス (Configured を拡張し、Tool を実装します。これは新しい API で行われます) にラップし、クラス名を Hadoop ストリーミング コマンド ラインに渡すのではなく、クラスを別々にマップして縮小しますか?

4

1 に答える 1

0

ストリーミングは古い API ( org.apache.hadoop.mapred) を使用しますが、マッパー クラスとリデューサー クラスは新しい API クラスを拡張します ( org.apache.hadoop.mapreduce)。

マッパーを実装org.apache.hadoop.mapred.Mapperに変更し、リデューサーを実装に変更してみてください。org.apache.hadoop.mapred.Reducer次に例を示します。

package courseproj.example;

// Mapper: emits ("article", 1) for every article occurrence.
public class WikiReader_Mapper implements Mapper<Text, Text, Text, IntWritable> {

  // Reuse objects to save overhead of object creation.
  private final static Text KEY = new Text();
  private final static IntWritable VALUE = new IntWritable(1);

  @Override
  public void map(Text key, Text value, OutputCollector<Text, IntWritable> collector, Reporter reporter)
      throws IOException, InterruptedException {
    KEY.set("article count");
    collector.collect(KEY, VALUE);
  }
}
于 2013-04-17T00:02:47.470 に答える