おそらく最も簡単なことは、ID をキーとしてマッパーにデータを出力させることです。これにより、1 つのレデューサーが特定の ID のすべてのレコードを取得し、レデューサー フェーズで処理を行うことが保証されます。
例えば、
入力データ:
b1 2:00pm z1 1
b2 3:00pm z2 2
c1 4:00pm y2 1
b3 3:00pm z3 3
c4 4:00pm x2 2
マッパーコード:
#!/usr/bin/env python
import sys
for line in sys.stdin:
line = line.strip()
cols = line.split("\t")
key = cols[-1]
print key + "\t" + line
マップ出力:
1 b1 2:00pm z1 1
2 b2 3:00pm z2 2
1 c1 4:00pm y2 1
3 b3 3:00pm z3 3
2 c4 4:00pm x2 2
レデューサー 1 入力:
1 b1 2:00pm z1 1
1 c1 4:00pm y2 1
レデューサー 2 入力:
2 b2 3:00pm z2 2
レデューサー 3 入力:
3 b3 3:00pm z3 3
レデューサー コード:
#!/usr/bin/env python
import sys
for line in sys.stdin:
line = line.strip()
cols = line.split("\t")
orig_line = "\t".join(cols[1:])
# do stuff...
この方法では、単一のレデューサーが複数のキーを取得する可能性がありますが、データは順序付けられ、mapred.reduce.tasksオプションでレデューサーの数を制御できることに注意してください。
編集
キーごとにリデューサーでデータを収集したい場合は、次のようなことができます(そのまま実行されるかどうかはわかりませんが、アイデアは得られます)
#!/usr/bin/env python
import sys
def process_data(key_id, data_list):
# data_list has all the lines for key_id
last_key = None
data = []
for line in sys.stdin:
line = line.strip()
cols = line.split("\t")
key = cols[0]
if last_key and key != last_key:
process_data(last_key, data)
data = []
orig_line = "\t".join(cols[1:])
data.append(orig_line)
last_key = key
process_data(last_key, data)
レデューサー ステップでメモリ不足が心配されない場合は、次のようにコードを簡略化できます。
#!/usr/bin/env python
import sys
from collections import defaultdict
def process_data(key_id, data_list):
# data_list has all the lines for key_id
all_data = defaultdict(list)
for line in sys.stdin:
line = line.strip()
cols = line.split("\t")
key = cols[0]
orig_line = "\t".join(cols[1:])
all_data[key].append(orig_line)
for key, data in all_data.iteritems():
process_data(key, data)