1

これは Hadoop での私の最初の実装です。Map Reduce で確率的データセットのアルゴリズムを実装しようとしています。私のデータセットでは、最後の列にいくつかの ID があります (データセット内の一意の ID の数は、クラスター内のノードの数と同じです)。この列の値に基づいてデータセットを分割する必要があり、レコードの各セットはクラスター内の各ノードによって処理される必要があります。

たとえば、クラスターに 3 つのノードがある場合、以下のデータセットでは、1 つのノードが id=1 のすべてのレコードを処理し、別のノードが id=2 のレコードを処理し、別のノードが id=3 のレコードを処理する必要があります。

name time  dept  id
--------------------
 b1  2:00pm z1   1
 b2  3:00pm z2   2
 c1  4:00pm y2   1
 b3  3:00pm z3   3
 c4  4:00pm x2   2

私のマップ関数は、各分割を入力として受け取り、各ノードで並行して処理する必要があります。

Hadoopでどのアプローチが可能かを理解しようとしています。このデータセットをマップ関数の入力として入力し、追加の引数を map に渡して、id 値に基づいてデータを分割します。または、事前にデータを「n」(ノード数)のサブセットに分割してノードにロードします。これが正しいアプローチである場合、値に基づいてデータを分割し、異なるノードにロードする方法。私の読書から私が理解したのは、hadoop が指定されたサイズに基づいてデータをブロックに分割したことです。ロード中に特定の条件を指定するにはどうすればよいですか。まとめると、私は自分のプログラムをPythonで書いています。

誰かアドバイスください。ありがとう

4

2 に答える 2

1

おそらく最も簡単なことは、ID をキーとしてマッパーにデータを出力させることです。これにより、1 つのレデューサーが特定の ID のすべてのレコードを取得し、レデューサー フェーズで処理を行うことが保証されます。

例えば、

入力データ:

 b1  2:00pm z1   1
 b2  3:00pm z2   2
 c1  4:00pm y2   1
 b3  3:00pm z3   3
 c4  4:00pm x2   2

マッパーコード:

#!/usr/bin/env python
import sys
for line in sys.stdin:
    line = line.strip()
    cols = line.split("\t")
    key = cols[-1]
    print key + "\t" + line

マップ出力:

 1  b1  2:00pm z1   1
 2  b2  3:00pm z2   2
 1  c1  4:00pm y2   1
 3  b3  3:00pm z3   3
 2  c4  4:00pm x2   2

レデューサー 1 入力:

 1  b1  2:00pm z1   1
 1  c1  4:00pm y2   1

レデューサー 2 入力:

 2  b2  3:00pm z2   2

レデューサー 3 入力:

 3  b3  3:00pm z3   3

レデューサー コード:

#!/usr/bin/env python
import sys
for line in sys.stdin:
    line = line.strip()
    cols = line.split("\t")
    orig_line = "\t".join(cols[1:])
    # do stuff...

この方法では、単一のレデューサーが複数のキーを取得する可能性がありますが、データは順序付けられ、mapred.reduce.tasksオプションでレデューサーの数を制御できることに注意してください。

編集 キーごとにリデューサーでデータを収集したい場合は、次のようなことができます(そのまま実行されるかどうかはわかりませんが、アイデアは得られます)

#!/usr/bin/env python
import sys
def process_data(key_id, data_list):
   # data_list has all the lines for key_id

last_key = None
data = []
for line in sys.stdin:
    line = line.strip()
    cols = line.split("\t")
    key = cols[0]
    if last_key and key != last_key:
        process_data(last_key, data)
        data = []
    orig_line = "\t".join(cols[1:])
    data.append(orig_line)
    last_key = key
process_data(last_key, data)

レデューサー ステップでメモリ不足が心配されない場合は、次のようにコードを簡略化できます。

#!/usr/bin/env python
import sys
from collections import defaultdict
def process_data(key_id, data_list):
   # data_list has all the lines for key_id

all_data = defaultdict(list)
for line in sys.stdin:
    line = line.strip()
    cols = line.split("\t")
    key = cols[0]
    orig_line = "\t".join(cols[1:])
    all_data[key].append(orig_line)
for key, data in all_data.iteritems():
    process_data(key, data)
于 2014-09-16T10:12:35.193 に答える
0

あなたの質問を理解した場合、最善の方法は、データセットをハイブテーブルにロードしてから、Python でUDFを記述することです。その後、次のようにします。

select your_python_udf(name, time, dept, id) from table group by id;

これはreduceフェーズのように見えるので、クエリを起動する前にこれが必要になるかもしれません

set mapred.reduce.tasks=50;

カスタム UDF の作成方法:

ハイブプラグイン

関数の作成

于 2014-09-12T15:49:07.360 に答える