1

私は、マップとリデュースのプリミティブが次のように見える単純なマップリデュース プログラムを持っています。

map(K,V) = (Text, OutputAggregator)
reduce(Text, OutputAggregator) = (Text,Text)

重要な点は、マップ関数から、Writable インターフェイスを実装する独自のクラスである OutputAggregator 型のオブジェクトを発行することです。ただし、私のreduceは次の例外で失敗します。具体的には、readFieds() 関数が例外をスローしています。理由は何ですか?Hadoop 0.18.3 を使用しています

10/09/19 04:04:59 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
10/09/19 04:04:59 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
10/09/19 04:04:59 INFO mapred.FileInputFormat: Total input paths to process : 1
10/09/19 04:04:59 INFO mapred.FileInputFormat: Total input paths to process : 1
10/09/19 04:04:59 INFO mapred.FileInputFormat: Total input paths to process : 1
10/09/19 04:04:59 INFO mapred.FileInputFormat: Total input paths to process : 1
10/09/19 04:04:59 INFO mapred.JobClient: Running job: job_local_0001
10/09/19 04:04:59 INFO mapred.MapTask: numReduceTasks: 1
10/09/19 04:04:59 INFO mapred.MapTask: io.sort.mb = 100
10/09/19 04:04:59 INFO mapred.MapTask: data buffer = 79691776/99614720
10/09/19 04:04:59 INFO mapred.MapTask: record buffer = 262144/327680
Length = 10
10
10/09/19 04:04:59 INFO mapred.MapTask: Starting flush of map output
10/09/19 04:04:59 INFO mapred.MapTask: bufstart = 0; bufend = 231; bufvoid = 99614720
10/09/19 04:04:59 INFO mapred.MapTask: kvstart = 0; kvend = 10; length = 327680
gl_books
10/09/19 04:04:59 WARN mapred.LocalJobRunner: job_local_0001
java.lang.NullPointerException
 at org.myorg.OutputAggregator.readFields(OutputAggregator.java:46)
 at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67)
 at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40)
 at org.apache.hadoop.mapred.Task$ValuesIterator.readNextValue(Task.java:751)
 at org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:691)
 at org.apache.hadoop.mapred.Task$CombineValuesIterator.next(Task.java:770)
 at org.myorg.xxxParallelizer$Reduce.reduce(xxxParallelizer.java:117)
 at org.myorg.xxxParallelizer$Reduce.reduce(xxxParallelizer.java:1)
 at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.combineAndSpill(MapTask.java:904)
 at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:785)
 at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:698)
 at org.apache.hadoop.mapred.MapTask.run(MapTask.java:228)
 at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:157)
java.io.IOException: Job failed!
 at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1113)
 at org.myorg.xxxParallelizer.main(xxxParallelizer.java:145)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
 at java.lang.reflect.Method.invoke(Unknown Source)
 at org.apache.hadoop.util.RunJar.main(RunJar.java:155)
 at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54)
 at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
 at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
 at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68)
4

2 に答える 2

3

カスタム コードに関する質問を投稿する場合: 関連するコードを投稿してください。したがって、46行目と前後の数行の内容は本当に役に立ちます...:)

ただし、これは役立つ場合があります。

独自のWritable Class を作成する際の落とし穴は、Hadoop がクラスの実際のインスタンスを何度も再利用することです。readFields への呼び出しの間に、光沢のある新しいインスタンスを取得しません。

したがって、 readFields メソッドの開始時には、現在のオブジェクトが「ガベージ」で満たされていると想定する必要があり、続行する前にクリアする必要があります。

私の提案は、現在のインスタンスを完全に消去し、インスタンスが作成されてコンストラクターが完了した直後の状態にリセットする「clear()」メソッドを実装することです。そしてもちろん、キーと値の両方について、readFields で最初にそのメソッドを呼び出します。

HTH

于 2010-09-21T11:53:15.640 に答える
2

Niels Basjes の回答に加えて、空のコンストラクター内でメンバー変数を初期化するだけです (提供する必要があります。そうしないと、Hadoop はオブジェクトを初期化できません)。

public OutputAggregator() {
    this.member = new IntWritable();
    ...
}

this.memberそれがタイプであると仮定しIntWritableます。

于 2013-10-17T13:18:01.663 に答える