Python による Hadoop ストリーミングを使用して、入力キーの平均値を計算しようとしています。マッパー、コンバイナー、レデューサーのコードは次のとおりです。
#mapper:
import sys
def map(argv):
line = sys.stdin.readline()
try:
while line:
word, num = line.split()
num = int(num)
print word+'\t'+str(num)
line = sys.stdin.readline()
except Exception, ex:
print 'mapper ex:'+str(ex)
return None
if __name__ == "__main__":
map(sys.argv)
#combiner
import sys
def combine(argv):
line = sys.stdin.readline()
cur_word = ''
cur_num = 0
cur_times = 0
try:
while line:
word, num = line.split('\t')
if word != cur_word:
if cur_word != '':
print cur_word+'\t'+str(cur_num)+'\t'+str(cur_times)
cur_word = word
cur_num = 0
cur_times = 0
cur_num += int(num)
cur_times += 1
line = sys.stdin.readline()
print cur_word+'\t'+str(cur_num)+'\t'+str(cur_times)
except Exception, ex:
print 'except:{0}'.format(ex)
return None
if __name__ == "__main__":
combine(sys.argv)
#reducer
import sys
def reduce(argv):
line = sys.stdin.readline()
cur_word = ''
cur_num = 0
cur_times = 0
try:
while line:
word, num, times = line.split('\t')
if word != cur_word:
if cur_word != '':
if cur_times != 0:
avr = cur_num / cur_times
print cur_word+'\t'+str(cur_num)+'\t'+str(cur_times)+'\t'+str(avr)
else:
print cur_word+'\t'+str(cur_num)+'\t'+str(cur_times)+'\t'+'0'
cur_word = word
cur_num = 0
cur_times = 0
cur_num += int(num)
cur_times += int(times)
line = sys.stdin.readline()
if cur_times != 0:
avr = cur_num / cur_times
print cur_word+'\t'+str(cur_num)+'\t'+str(cur_times)+'\t'+str(avr)
else:
print cur_word+'\t'+str(cur_num)+'\t'+str(cur_times)+'\t'+'0'
except Exception, ex:
print 'except:{0}'.format(ex)
return None
if __name__ == "__main__":
reduce(sys.argv)
単純な map-combine-reduce プロセスのようですね。しかし、削減は毎回失敗します。ただし、コンバイナーを使用せず、コンバイナー.pyをリデューサーとして使用すると、機能します。
誰かが助けてくれてありがとう、どうもありがとう。