map reduce での中央値/分位数の計算の例を教えてください。
Datafu の中央値についての私の理解は、「n」マッパーがデータをソートし、データを「1」レデューサーに送信することです。これは、n マッパーからのすべてのデータをソートし、中央値 (中間値) を見つける責任があります。私の理解は正しいですか?
もしそうなら、このアプローチは膨大な量のデータに対応していますか?ありがとう
map reduce での中央値/分位数の計算の例を教えてください。
Datafu の中央値についての私の理解は、「n」マッパーがデータをソートし、データを「1」レデューサーに送信することです。これは、n マッパーからのすべてのデータをソートし、中央値 (中間値) を見つける責任があります。私の理解は正しいですか?
もしそうなら、このアプローチは膨大な量のデータに対応していますか?ありがとう
シリーズの中央値 (中央値) を見つけようとすると、1 つのレデューサーに数値の範囲全体を渡して、どれが「中間」値であるかを判断する必要があります。
入力セットの値の範囲と一意性に応じて、コンバイナーを導入して各値の頻度を出力し、単一のレデューサーに送信されるマップ出力の数を減らすことができます。レデューサーは、並べ替え値と頻度のペアを使用して中央値を特定できます。
これをスケーリングする別の方法 (値の範囲と大まかな分布がわかっている場合) は、カスタム パーティショナーを使用して、キーを範囲バケット (0-99 はレデューサー 0 に、100-199 はレデューサー 2 に、など) で分配することです。の上)。ただし、これには、レデューサーの出力を調べて最終的な中央値の計算を実行するための二次的なジョブが必要になります (たとえば、各レデューサーのキーの数がわかれば、どのレデューサーの出力が中央値を含み、どのオフセットであるかを計算できます)。
正確な中央値と分位数が本当に必要ですか?
多くの場合、おおよその値を取得して、それを操作する方がよいでしょう。特に、これをデータのパーティショニングなどに使用する場合はなおさらです。
実際、おおよその分位数を使用して、正確な分位数を(実際にはO(n/p)
時間内に) 見つける速度を上げることができます。戦略の大まかな概要は次のとおりです。
O(n)
) を実行して、真の分位点を見つけます。各ステップは線形時間です。最もコストのかかるステップはパート 3 です。データ セット全体を再配布する必要があるため、O(n)
ネットワーク トラフィックが生成されます。最初の反復で「代替」分位数を選択することで、おそらくプロセスを最適化できます。たとえば、グローバル中央値を見つけたいとします。線形プロセスでは簡単に見つけることはできませんが、k 個のパーティションに分割すると、おそらくデータ セットの 1/k に絞り込むことができます。したがって、各ノードに中央値を報告させる代わりに、各ノードに (k-1)/(2k) および (k+1)/(2k) のオブジェクトを追加で報告させます。これにより、真の中央値が必要な値の範囲を絞り込むことができます。大きく嘘をつく。したがって、次のステップでは、各ノードが目的の範囲内にあるオブジェクトを 1 つのマスター ノードに送信し、この範囲内の中央値のみを選択できます。
多くの現実のシナリオでは、データセット内の値のカーディナリティは比較的小さくなります。このような場合、問題は 2 つの MapReduce ジョブで効率的に解決できます。
ジョブ 1. はデータ量を大幅に削減し、完全に並行して実行できます。ジョブ 2 のリデューサーは、単純なアプローチと同様に、すべての値ではなくn
( n
= cardinality of your value set
) 項目のみを処理する必要があります。
以下は、ジョブ 2 のレデューサーの例です。これは、Hadoop ストリーミングで直接使用できる Python スクリプトです。データセットの値が であると仮定しますが、 sints
には簡単に採用できますdouble
import sys
item_to_index_range = []
total_count = 0
# Store in memory a mapping of a value to the range of indexes it has in a sorted list of all values
for line in sys.stdin:
item, count = line.strip().split("\t", 1)
new_total_count = total_count + int(count)
item_to_index_range.append((item, (total_count + 1, new_total_count + 1)))
total_count = new_total_count
# Calculate index(es) of middle items
middle_items_indexes = [(total_count / 2) + 1]
if total_count % 2 == 0:
middle_items_indexes += [total_count / 2]
# Retrieve middle item(s)
middle_items = []
for i in middle_items_indexes:
for item, index_range in item_to_index_range:
if i in range(*index_range):
middle_items.append(item)
continue
print sum(middle_items) / float(len(middle_items))
この回答は、最初はChris Whiteの回答から得られた提案に基づいています。答えは、コンバイナーを平均として使用して値の頻度を計算することを提案しています。ただし、MapReduce では、コンバイナーが常に実行されるとは限りません。これにはいくつかの副作用があります。
O((n log n)/p) で並べ替え、次に O(1) で中央値を取得します。
はい... O(n/p) を取得できますが、Hadoop ですぐに使用できる並べ替え機能を使用することはできません。並列 k 番目に大きいアルゴリズムをコーディングするのに 2 ~ 20 時間の開発時間を正当化できない限り、中心の項目を並べ替えて取得します。