2

データセットでngramカウントを取得するMapReduceジョブを作成しました。結果は、100個の300MBファイルの形式になります<ngram>\t<count>。これらを1つの結果に結合したいのですが、結合の試みが数回失敗しました(「タスクトラッカーがなくなりました」)。タイムアウトが8時間でしたが、このクラッシュは約8.5時間発生したため、関連している可能性があります。私は#reducers = 5(ノードの数と同じ)を持っていました。エラーはそれを示していないようですが、たぶん私はもっと時間を残す必要があります。ノードが過負荷になり、応答しなくなっているのではないかと思います。私の理論では、私のレデューサーはいくつかの最適化を使用できます。

私はcatマッパーに使用しており、レデューサーには次のPythonスクリプトを使用しています。

#!/usr/bin/env python
import sys

counts = {}
for line in sys.stdin:
    line = line.strip()
    key, count = line.split('\t', 1)

    try:
        count = int(count)
    except ValueError:
        continue

    if key not in counts:
        counts[key] = 0
    counts[key] += count

for key in sorted(counts.keys()):
    print '%s\t%s'% (key, counts[key])

更新: コメントの1つでほのめかしたように、Hadoopによって自動的にどのような並べ替えが行われるかについて混乱しています。Web UIでは、レデューサーのステータスには、「並べ替え」と「削減」を含むいくつかの異なるフェーズが表示されます。このことから、Hadoopはマッパー出力をreduceに送信する前に並べ替えると仮定しますが、並べ替えがレデューサーに送信されるすべてのデータに対して行われるのか、それとも削減される前の各ファイルに対して行われるのかは明確ではありません。言い換えれば、私のマッパーは100フィールドを取得し、これを400の出力に分割します。cat-それらをレデューサーに送信すると、レデューサー(合計5つ)がそれぞれこれらの80ストリームを受信します。ソートは80をすべて結合しますか、それとも1をソートし、それを減らしますか。等?グラフに基づいて、実際の動作を明らかに示していない可能性がありますが、ソートプロセスは削減の前に実行されます。並べ替えですべての入力ファイルが並べ替えられる場合は、レデューサーを簡略化してすべてのカウントの辞書を保存せず、キーが変更されたらキーとtotalCountのペアを出力することができます。

コンバイナーの使用に関しては、私が削減しようとしている100個のファイルで、削減しているデータがすでに削減されているため、これが私の場合には有益ではないと思います。私の#ノード=#レデューサー(5&5)なので、レデューサーがまだ行っていないことを組み合わせるものは何もありません。

4

2 に答える 2

2

問題は、MapReduceがどのように機能するかについての私の誤解でした。レデューサーに入るすべてのデータがソートされます。上記の私のコードは完全に最適化されていませんでした。代わりに、現在のキーを追跡し、新しいキーが表示されたときに以前の現在のキーを印刷します。

#!/usr/bin/env python
import sys

cur_key = None
cur_key_count = 0
for line in sys.stdin:
    line = line.strip()
    key, count = line.split('\t', 1)

    try:
        count = int(count)
    except ValueError:
        continue

    # if new key, reset count, note current key, and output lastk key's result
    if key != cur_key:
        if cur_key is not None:
            print '%s\t%s'% (cur_key, cur_key_count)
        cur_key = key
        cur_key_count = 0
    cur_key_count += count
# printing out final key if set
if cur_key:
    print '%s\t%s'% (cur_key, cur_key_count)
于 2011-12-09T22:50:42.510 に答える
1

topレデューサーが実行時にCPUバウンドであり、IOバウンドではない(スワッピングを引き起こす可能性がある)ことを確認するために使用します。

ホストあたり8時間/20ジョブは300Mbジョブあたり24分です

heapqメモリに組み込まれたデータ構造がソートされたままに なるようなものを使用できる可能性があります。http: //docs.python.org/library/heapq.htmlのセクション8.4.1を参照してください。

于 2011-11-05T19:47:58.010 に答える