Python の単一の Hadoop 疑似分散ノードでストリーミング Hadoop ジョブを実行しています。また、hadoop-lzo を使用して .lzo 圧縮入力ファイルで分割を生成しています。
小さな圧縮または非圧縮のテスト データセットを使用すると、すべてが期待どおりに機能します。MapReduce の出力は、単純な 'cat | 地図 | ソート | UNIX のパイプラインを減らします。- 入力が圧縮されているかどうか。
ただし、単一の大きな .lzo (事前にインデックス付けされた) データセット (最大 40GB 圧縮) の処理に移行し、ジョブが複数のマッパーに分割されると、出力が切り詰められたように見えます。最初のいくつかのキー値のみが存在します。
コードと出力が続きます。ご覧のとおり、プロセス全体をテストするための非常に単純なカウントです。
テスト データ (大規模なデータセットのサブセット) に対する単純な UNIX パイプラインからの出力。
lzop -cd objectdata_input.lzo | ./objectdata_map.py | sort | ./objectdata_red.py
3656 3
3671 3
51 6
テストデータに対する Hadoop ジョブからの出力 (上記と同じテストデータ)
hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-streaming-*.jar -input objectdata_input.lzo -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat -output retention_counts -mapper objectdata_map.py -reducer objectdata_red.py -file /home/bob/python-dev/objectdata_map.py -file /home/bob/python-dev/objectdata_red.py
3656 3
3671 3
51 6
ここで、テスト データは実際のデータセットからの行の小さなサブセットであるため、ジョブが完全なデータセットに対して実行されたときに、結果の出力に上記のキーが表示されることを少なくとも期待できます。しかし、私が得たものは次のとおりです。
hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-streaming-*.jar -input objectdata_input_full.lzo -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat -output retention_counts -mapper objectdata_map.py -reducer objectdata_red.py -file /home/bob/python-dev/objectdata_map.py -file /home/bob/python-dev/objectdata_red.py
1 40475582
12 48874
14 8929777
15 219984
16 161340
17 793211
18 78862
19 47561
2 14279960
20 56399
21 3360
22 944639
23 1384073
24 956886
25 9667
26 51542
27 2796
28 336767
29 840
3 3874316
30 1776
33 1448
34 12144
35 1872
36 1919
37 2035
38 291
39 422
4 539750
40 1820
41 1627
42 97678
43 67581
44 11009
45 938
46 849
47 375
48 876
49 671
5 262848
50 5674
51 90
6 6459687
7 4711612
8 20505097
9 135592
...データセットに基づいて予想されるよりもはるかに少ないキーがあります。
私はキー自体にあまり悩まされていません - このセットは、入力データセットを考えると予想される可能性があります。私は、さらに多くのキーが数千単位である必要があることをより懸念しています。データセットの最初の 2500 万レコードに対して UNIX パイプラインでコードを実行すると、約 1 ~ 7000 の範囲のキーが取得されます。
したがって、この出力は、私が実際に期待するものの最初の数行にすぎないように見えますが、その理由はわかりません。多くの part-0000# ファイルの照合が欠落していますか? または似たようなものですか?これは、私が自宅でテストしている単一ノードの疑似分散 Hadoop にすぎないため、収集する part-# ファイルがさらにある場合、それらがどこにあるのかわかりません。HDFS のretention_counts ディレクトリには表示されません。
マッパーとリデューサーのコードは次のとおりです。実際には、多くの単語数の例が浮かんでいるのと同じです。
objectdata_map.py
#!/usr/bin/env python
import sys
RETENTION_DAYS=(8321, 8335)
for line in sys.stdin:
line=line.strip()
try:
retention_days=int(line[RETENTION_DAYS[0]:RETENTION_DAYS[1]])
print "%s\t%s" % (retention_days,1)
except:
continue
objectdata_red.py
#!/usr/bin/env python
import sys
last_key=None
key_count=0
for line in sys.stdin:
key=line.split('\t')[0]
if last_key and last_key!=key:
print "%s\t%s" % (last_key,key_count)
key_count=1
else:
key_count+=1
last_key=key
print "%s\t%s" % (last_key,key_count)
これはすべて、手動でインストールされた hadoop 1.1.2、疑似分散モードで、hadoop-lzo からビルドおよびインストールされたものです。