9

行レベルの条件に基づいて大規模なデータセットから多数のランダム サンプルを選択する map/reduce ジョブを作成したいと考えています。中間キーの数を最小限に抑えたい。

擬似コード:

for each row 
  if row matches condition
    put the row.id in the bucket if the bucket is not already large enough

このようなことをしたことがありますか?よく知られているアルゴリズムはありますか?

連続した行を含むサンプルでも十分です。

ありがとう。

4

3 に答える 3

12

マッパー: それぞれがランダムな整数キーを持つすべての適格な値を出力します。

単一のレデューサー: 最初の N 個の値を出力し、キーを破棄します。

ソーターは、マッパーの出力順序をランダム化します。マッパーがいくつの適格な値を見つけるかわからないため、各マッパーはそのパーティションからすべての適格な値を出力する必要があります。

一般に、Hadoop 機構をできるだけ多く使用する、このような単純なマッパー/リデューサー ツールを構築するのが好きです。私はそれらをさまざまなタスクで再利用することになります。

于 2010-03-25T19:12:08.450 に答える
8

カールのアプローチは問題なく機能しますが、マッパーによって生成されるデータの量を大幅に減らすことができます。

必要なサンプル数をKとします。これは、ノードの 1 つでメモリに保持するのに十分小さいと想定します。一致する各行にランダムな値を割り当て、選択アルゴリズムの修正を使用してK個の最小値を見つけます。

各マッパーのセットアップ部分で、プライオリティ キューを作成します。これには、フィボナッチ ヒープが適しています。フロートを優先順位として使用します。膨大な量のデータがある場合は、引き分けを避けるために double の方が適切な場合があります。条件に一致する行ごとに、優先順位として 0 から 1 の間でランダムに選択された float を使用して、その行を優先キューに挿入します。キューにK個を超えるものがある場合は、最も価値の高いアイテムを削除します (これは、標準的なフィボナッチ ヒープの用語とは逆です)。

最後に、マッパーの最後で、キュー内のすべてを発行します。発行するアイテムごとに、キーとして優先度を使用しFloatWritable、対応する行の表現を値 (行 ID、または行の内容全体) として使用します。マッパーごとにK個の値のみを出力します (そのマッパーに一致する行がK個より少ない場合はそれ以下)。

単一のレデューサーでは、Hadoop はキーを最小から最大の順に自動的にスキャンします。最初に表示されたK個のキー ( K個の最小キー) に対応する行を発行してから、終了します。

これが機能するのは、一致する各行がK個の最小の float 値の 1 つを持つ確率が同じであるためです。各マッパーのK個の最小フロートを追跡して、見逃しがないことを確認してから、それらをレデューサーに送信して、全体で最小のK個を見つけます。

于 2010-04-06T12:31:49.620 に答える
2

Bkkbrad のアプローチは、各マッパーから送信されるレコードの数が (最大で) K であるという点で、おそらく最も効率的です。一方、サンプル自体 (つまり、K 個の要素) が単一のメモリに収まると仮定していることに注意してください。減速機。

そうでない場合は、マッパーによって一致する各行に {1,..,K} のランダムな整数が割り当てられ、reduce フェーズでキーごとに 1 つの要素が選択される、完全に分散されたアプローチを単純に使用したくなるかもしれません。 (この質問も参照してください)。ただし、このアプローチの問題は、たまたま特定のキーに行が割り当てられない場合があり、その場合、最終的なサンプルの要素が K 未満になることです。これは、K が行の総数 N よりもはるかに小さい場合は小さな確率で発生しますが、K が N の一定の割合である場合 (K=N/3 の場合) は一定の確率で発生します。

機能する解決策は次のとおりです。B 個のバケットがあり、まず各要素をランダムなバケットに入れ、次に各バケットでランダムな順序を生成することにより、要素のランダムな順序を生成するとします。最初のバケットの要素は、(順序に関して) 2 番目のバケットの要素よりも小さいと見なされます。次に、サイズ K のサンプルを選択する場合、最初の j 個のバケット内のすべての要素を収集できます (全体で K 未満の要素数 t が含まれている場合)。次に、次のバケットから残りの Kt 個の要素を選択します。ここで、B は、N/B 個の要素がメモリに収まるようなパラメーターです。重要な側面は、バケットを並行して処理できることです。

Mapper: それぞれがランダムなキー (j, r) を持つすべての条件を満たす行を出力します。ここで、j は {1,..,B} 内のランダムな整数で、r はランダムな float です。さらに、j 未満のキー (1<=j<=B の場合) を持つ要素の数を追跡し、この情報をレデューサーに送信します。

シャッフル: j で分割し、r で 2 次並べ替えを行います。

レデューサー: バケット j を考えてみましょう。リデューサーは、j 未満のバケットに含まれる要素の数と、バケット j に含まれる要素の数を (マッパーが受け取った情報を集約することによって) 知っていると仮定します。j 以下のバケット内の要素数が K 以下の場合、バケット j 内のすべての要素を出力します。バケットが厳密に j より小さい要素の数が t < K の場合、リザーバー サンプリングを実行して、バケットから K−t 個のランダムな要素を選択します。残りの場合、つまり j 未満のバケット内の要素の数が少なくとも K の場合は、何も出力しません。

この問題のより簡単な解決策を私は知りませんが、あればいいと思います。

詳細については、こちらのブログをご覧ください。

于 2013-08-18T01:47:07.187 に答える