データセットでngramカウントを取得するMapReduceジョブを作成しました。結果は、100個の300MBファイルの形式になります<ngram>\t<count>
。これらを1つの結果に結合したいのですが、結合の試みが数回失敗しました(「タスクトラッカーがなくなりました」)。タイムアウトが8時間でしたが、このクラッシュは約8.5時間発生したため、関連している可能性があります。私は#reducers = 5(ノードの数と同じ)を持っていました。エラーはそれを示していないようですが、たぶん私はもっと時間を残す必要があります。ノードが過負荷になり、応答しなくなっているのではないかと思います。私の理論では、私のレデューサーはいくつかの最適化を使用できます。
私はcat
マッパーに使用しており、レデューサーには次のPythonスクリプトを使用しています。
#!/usr/bin/env python
import sys
counts = {}
for line in sys.stdin:
line = line.strip()
key, count = line.split('\t', 1)
try:
count = int(count)
except ValueError:
continue
if key not in counts:
counts[key] = 0
counts[key] += count
for key in sorted(counts.keys()):
print '%s\t%s'% (key, counts[key])
更新:
コメントの1つでほのめかしたように、Hadoopによって自動的にどのような並べ替えが行われるかについて混乱しています。Web UIでは、レデューサーのステータスには、「並べ替え」と「削減」を含むいくつかの異なるフェーズが表示されます。このことから、Hadoopはマッパー出力をreduceに送信する前に並べ替えると仮定しますが、並べ替えがレデューサーに送信されるすべてのデータに対して行われるのか、それとも削減される前の各ファイルに対して行われるのかは明確ではありません。言い換えれば、私のマッパーは100フィールドを取得し、これを400の出力に分割します。cat
-それらをレデューサーに送信すると、レデューサー(合計5つ)がそれぞれこれらの80ストリームを受信します。ソートは80をすべて結合しますか、それとも1をソートし、それを減らしますか。等?グラフに基づいて、実際の動作を明らかに示していない可能性がありますが、ソートプロセスは削減の前に実行されます。並べ替えですべての入力ファイルが並べ替えられる場合は、レデューサーを簡略化してすべてのカウントの辞書を保存せず、キーが変更されたらキーとtotalCountのペアを出力することができます。
コンバイナーの使用に関しては、私が削減しようとしている100個のファイルで、削減しているデータがすでに削減されているため、これが私の場合には有益ではないと思います。私の#ノード=#レデューサー(5&5)なので、レデューサーがまだ行っていないことを組み合わせるものは何もありません。