0

Python http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/で Hadoop ワード カウントの例を理解しようとしてい ます。

作成者は、単純なバージョンのマッパーとリデューサーから始めます。これがレデューサーです(簡潔にするためにいくつかのコメントを削除しました)

#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    line = line.strip()

    word, count = line.split('\t', 1)

    try:
        count = int(count)
    except ValueError:
        continue

    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

作成者は、以下を使用してプログラムをテストします。

echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py

したがって、レデューサーは、レデューサー ジョブの入力データが次のようであるかのように記述されます。

aa 1
aa 1
bb 1
cc 1
cc 1
cc 1

レデューサーについて私が最初に理解したのは、特定のレデューサーの入力データには 1 つの一意のキーが含まれるということでした。したがって、前の例では、3 つのレデューサー ジョブが必要になります。私の理解は間違っていますか?

次に、著者はマッパーとリデューサーの改良版を提示します。レデューサーは次のとおりです。

#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""

from itertools import groupby
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

def main(separator='\t'):
    # input comes from STDIN (standard input)
    data = read_mapper_output(sys.stdin, separator=separator)

    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_count = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except ValueError:
            # count was not a number, so silently discard this item
            pass

if __name__ == "__main__":
    main()

著者は次の警告を追加します。

注: 次の Map および Reduce スクリプトは、Hadoop コンテキストで実行された場合、つまり MapReduce ジョブの Mapper および Reducer として実行された場合にのみ「正しく」機能します。これは、単純なテスト コマンド「cat DATA | ./マッパー.py | 並べ替え -k1,1 | ./reducer.py」は、一部の機能が意図的に Hadoop にアウトソーシングされているため、正しく動作しなくなります。

単純なテスト コマンドが新しいバージョンで機能しない理由がわかりません。sort -k1,1を使用すると、両方のバージョンのレデューサーで同じ入力が生成されると思いました。私は何が欠けていますか?

4

1 に答える 1

0

あなたの最初の質問について:「レデューサーについての私の最初の理解は、特定のレデューサーの入力データには1つの一意のキーが含まれるということでした。したがって、前の例では、3つのレデューサージョブが必要になります。私の理解は間違っていますか?」

MapReduce の抽象化と、その抽象化の Hadoop の実装には違いがあります。抽象化では、リデューサーは一意のキーに関連付けられます。一方、Hadoop の実装では、複数のキーを同じレデューサーに割り当てます (プロセスを閉じて新しいプロセスを開始するコストを回避するため)。特に、Hadoop ストリーミングでは、リデューサーは特定の数のキー (ゼロ、1 つ、または複数のキー) に対応するキーと値のペアを受け取りますが、特定のキーに関連付けられたキーと値のペアがお互いに連続して来ます。

たとえば、入力「foo foo quux labs foo bar quux」を使用した単語カウントの例を見てみましょう。次に、レデューサーが入力「bar 1\nfoo 1\nfoo 1\nfoo1」を受信し、別のレデューサーが「labs 1\nquux 1\nquux 1」を受信する可能性があります。実行される実際のレデューサー プロセスの数は、オプション mapred.reduce.tasks を使用して決定されます。たとえば、2つのレデューサーを使用するには、次のことができます

 $ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -D mapred.reduce.tasks=2 -mapper ....

あなたの2番目の質問に関してsort -k1,1は、トリックを行うことに同意するので、問題もわかりません。

于 2013-08-18T05:31:02.427 に答える