0

Python の単一の Hadoop 疑似分散ノードでストリーミング Hadoop ジョブを実行しています。また、hadoop-lzo を使用して .lzo 圧縮入力ファイルで分割を生成しています。

小さな圧縮または非圧縮のテスト データセットを使用すると、すべてが期待どおりに機能します。MapReduce の出力は、単純な 'cat | 地図 | ソート | UNIX のパイプラインを減らします。- 入力が圧縮されているかどうか。

ただし、単一の大きな .lzo (事前にインデックス付けされた) データセット (最大 40GB 圧縮) の処理に移行し、ジョブが複数のマッパーに分割されると、出力が切り詰められたように見えます。最初のいくつかのキー値のみが存在します。

コードと出力が続きます。ご覧のとおり、プロセス全体をテストするための非常に単純なカウントです。

テスト データ (大規模なデータセットのサブセット) に対する単純な UNIX パイプラインからの出力。

lzop -cd objectdata_input.lzo | ./objectdata_map.py | sort | ./objectdata_red.py

3656  3
3671  3
51    6

テストデータに対する Hadoop ジョブからの出力 (上記と同じテストデータ)

hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-streaming-*.jar -input objectdata_input.lzo -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat -output retention_counts -mapper objectdata_map.py -reducer objectdata_red.py -file /home/bob/python-dev/objectdata_map.py -file /home/bob/python-dev/objectdata_red.py

3656  3
3671  3
51    6

ここで、テスト データは実際のデータセットからの行の小さなサブセットであるため、ジョブが完全なデータセットに対して実行されたときに、結果の出力に上記のキーが表示されることを少なくとも期待できます。しかし、私が得たものは次のとおりです。

hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-streaming-*.jar -input objectdata_input_full.lzo -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat -output retention_counts -mapper objectdata_map.py -reducer objectdata_red.py -file /home/bob/python-dev/objectdata_map.py -file /home/bob/python-dev/objectdata_red.py

1       40475582
12      48874
14      8929777
15      219984
16      161340
17      793211
18      78862
19      47561
2       14279960
20      56399
21      3360
22      944639
23      1384073
24      956886
25      9667
26      51542
27      2796
28      336767
29      840
3       3874316
30      1776
33      1448
34      12144
35      1872
36      1919
37      2035
38      291
39      422
4       539750
40      1820
41      1627
42      97678
43      67581
44      11009
45      938
46      849
47      375
48      876
49      671
5       262848
50      5674
51      90
6       6459687
7       4711612
8       20505097
9       135592

...データセットに基づいて予想されるよりもはるかに少ないキーがあります。

私はキー自体にあまり悩まされていません - このセットは、入力データセットを考えると予想される可能性があります。私は、さらに多くのキーが数千単位である必要があることをより懸念しています。データセットの最初の 2500 万レコードに対して UNIX パイプラインでコードを実行すると、約 1 ~ 7000 の範囲のキーが取得されます。

したがって、この出力は、私が実際に期待するものの最初の数行にすぎないように見えますが、その理由はわかりません。多くの part-0000# ファイルの照合が欠落していますか? または似たようなものですか?これは、私が自宅でテストしている単一ノードの疑似分散 Hadoop にすぎないため、収集する part-# ファイルがさらにある場合、それらがどこにあるのかわかりません。HDFS のretention_counts ディレクトリには表示されません。

マッパーとリデューサーのコードは次のとおりです。実際には、多くの単語数の例が浮かんでいるのと同じです。

objectdata_map.py

#!/usr/bin/env python

import sys
RETENTION_DAYS=(8321, 8335)

for line in sys.stdin:
        line=line.strip()
        try:
                retention_days=int(line[RETENTION_DAYS[0]:RETENTION_DAYS[1]])
                print "%s\t%s" % (retention_days,1)
        except:
                continue

objectdata_red.py

#!/usr/bin/env python                                                                                                                                    

import sys                                                                                                                                               
last_key=None
key_count=0
for line in sys.stdin:
        key=line.split('\t')[0]
        if last_key and last_key!=key:
                print "%s\t%s" % (last_key,key_count)
                key_count=1
        else:
                key_count+=1

        last_key=key

print "%s\t%s" % (last_key,key_count)

これはすべて、手動でインストールされた hadoop 1.1.2、疑似分散モードで、hadoop-lzo からビルドおよびインストールされたものです。

https://github.com/kevinweil/hadoop-lzo

4

0 に答える 0