1

yahooのhadoopチュートリアルにある添付の画像によると、操作の順序は、マップ>結合>パーティションであり、その後にreduceが続く必要があります。

これが、マップ操作によって送信されるキーの例です。

LongValueSum:geo_US|1311722400|E        1

同じタイプのキーが100個あるとすると、これは次のように組み合わせる必要があります。

geo_US|1311722400|E     100

次に、最初のパイプの前の値でキーを分割したいと思います(|) http://hadoop.apache.org/common/docs/r0.20.2/streaming.html#A+Useful+Partitioner+Class+%28secondary + sort%2C + the + -partitioner + org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner + option%29

geo_US

これが私のストリーミングコマンドです

hadoop jar /usr/local/hadoop/contrib/streaming/hadoop-streaming-0.20.203.0.jar \
-D mapred.reduce.tasks=8 \
-D stream.num.map.output.key.fields=1 \
-D mapred.text.key.partitioner.options=-k1,1 \
-D stream.map.output.field.separator=\| \
-file mapper.py \
-mapper mapper.py \
-file reducer.py \
-reducer reducer.py \
-combiner org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-input input_file \
-output output_path

これは私が得るエラーです

java.lang.NumberFormatException: For input string: "1311722400|E    1"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:48)
at java.lang.Long.parseLong(Long.java:419)
at java.lang.Long.parseLong(Long.java:468)
at org.apache.hadoop.mapred.lib.aggregate.LongValueSum.addNextValue(LongValueSum.java:48)
at org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer.reduce(ValueAggregatorReducer.java:59)
at org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer.reduce(ValueAggregatorReducer.java:35)
at org.apache.hadoop.mapred.Task$OldCombinerRunner.combine(Task.java:1349)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1435)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1297)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:371)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:253)

コンバイナーの前にパーティショナーが実行されているようです。何かご意見は?

4

2 に答える 2

1

コンバイナーが実際にHadoopバージョン>0.16で実行されるという保証はありません。hadoop 17では、単一<K,V>がソートバッファー全体を占有している場合、コンバイナーは実行されません。バージョン>0.18では、コンバイナーはマップフェーズとリデュースフェーズの両方で複数回実行できます。

基本的に、あなたのアルゴリズムは、Combine関数が呼び出されるかどうかに依存するべきではありません。これは、単なる最適化を目的としているためです。詳細については、本Haddop、決定的なガイドをチェックしてください..ここでグーグルブックの結合関数について説明しているスニペットを見つけました

于 2011-08-07T03:29:06.257 に答える
1

「Hadoop Definitive Guide」第6章シャッフルとソートを確認しました。マップ出力は、最初にメモリにバッファリングされます。メモリがしきい値を超えると、マップ出力がディスクに書き込まれます。ディスクに書き込む前に、データは分割されます。各パーティション内で、データはキーでソートされます。あとはコンバイナ機能があればソート出力を結合します。

ディスク上に多数のスピル ファイルが存在する可能性があります。スピル ファイルが 3 つ以上ある場合は、出力がディスクに書き込まれる前にコンバイナーが再度実行されます。

最後に、IO の数を減らすために、すべてのスピル ファイルが 1 つのファイルにマージされます。

つまり、マッパーの場合: マップ --> パーティション --> ソート ---> コンバイナー

reduer の場合: フォーム マッパーのコピー --> マージ (存在する場合はコンバイナーが呼び出されます) -> 縮小

于 2014-03-20T03:38:26.500 に答える