Python http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/で Hadoop ワード カウントの例を理解しようとしてい ます。
作成者は、単純なバージョンのマッパーとリデューサーから始めます。これがレデューサーです(簡潔にするためにいくつかのコメントを削除しました)
#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
line = line.strip()
word, count = line.split('\t', 1)
try:
count = int(count)
except ValueError:
continue
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
作成者は、以下を使用してプログラムをテストします。
echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py
したがって、レデューサーは、レデューサー ジョブの入力データが次のようであるかのように記述されます。
aa 1
aa 1
bb 1
cc 1
cc 1
cc 1
レデューサーについて私が最初に理解したのは、特定のレデューサーの入力データには 1 つの一意のキーが含まれるということでした。したがって、前の例では、3 つのレデューサー ジョブが必要になります。私の理解は間違っていますか?
次に、著者はマッパーとリデューサーの改良版を提示します。レデューサーは次のとおりです。
#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator, 1)
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_mapper_output(sys.stdin, separator=separator)
for current_word, group in groupby(data, itemgetter(0)):
try:
total_count = sum(int(count) for current_word, count in group)
print "%s%s%d" % (current_word, separator, total_count)
except ValueError:
# count was not a number, so silently discard this item
pass
if __name__ == "__main__":
main()
著者は次の警告を追加します。
注: 次の Map および Reduce スクリプトは、Hadoop コンテキストで実行された場合、つまり MapReduce ジョブの Mapper および Reducer として実行された場合にのみ「正しく」機能します。これは、単純なテスト コマンド「cat DATA | ./マッパー.py | 並べ替え -k1,1 | ./reducer.py」は、一部の機能が意図的に Hadoop にアウトソーシングされているため、正しく動作しなくなります。
単純なテスト コマンドが新しいバージョンで機能しない理由がわかりません。sort -k1,1
を使用すると、両方のバージョンのレデューサーで同じ入力が生成されると思いました。私は何が欠けていますか?