85

特にGoogleの超並列計算システムのコンテキストでは、map/reduceについて多くのことを聞きます。正確には何ですか?

4

5 に答える 5

70

Google のMapReduce研究発表ページの要約から:

MapReduce はプログラミング モデルであり、大規模なデータ セットを処理および生成するための関連する実装です。ユーザーは、キーと値のペアを処理して一連の中間キーと値のペアを生成する map 関数と、同じ中間キーに関連付けられたすべての中間値をマージする reduce 関数を指定します。

MapReduce の利点は、複数の処理ノード (複数のサーバー) で並列に処理を実行できるため、非常に拡張性に優れたシステムです。

これは関数型プログラミングモデルに基づいているため、mapおよびreduceステップにはそれぞれ副作用がなく (mapプロセスの各サブセクションの状態と結果は別のサブセクションに依存しません)、マッピングおよび縮小されるデータ セットをそれぞれ分離できます。複数の処理ノードにわたって。

ジョエルのプログラミング言語はこれを行うことができますか? この記事では、Google が検索エンジンを強化する MapReduce を開発するために、関数型プログラミングを理解することがいかに重要であったかについて説明しています。関数型プログラミングと、関数型プログラミングがスケーラブルなコードを可能にする方法に慣れていない場合は、非常に良い読み物です。

参照:ウィキペディア: MapReduce

関連質問: mapreduce を簡単に説明してください

于 2008-12-23T07:00:18.457 に答える
16

マップは、リスト上のすべてのアイテムに別の関数を適用して、すべての戻り値を含む別のリストを生成する関数です。(「fをxに適用する」という別の言い方は、「fを呼び出し、xを渡す」です。したがって、「呼び出す」の代わりに「適用する」と言う方が良い場合があります。)

これはおそらくマップがC#で書かれている方法です(それSelectは呼び出され、標準ライブラリにあります):

public static IEnumerable<R> Select<T, R>(this IEnumerable<T> list, Func<T, R> func)
{
    foreach (T item in list)
        yield return func(item);
}

あなたはJavaの男であり、Joel Spolskyは、JavaがいかにくだらないかについてGROSSLY UNFAIR LIESに話すのが好きです(実際、彼は嘘をついていません、それはくだらないですが、私はあなたを倒そうとしています)、これが私の非常に大まかな試みですJavaバージョン(私はJavaコンパイラを持っておらず、Javaバージョン1.1を漠然と覚えています!):

// represents a function that takes one arg and returns a result
public interface IFunctor
{
    object invoke(object arg);
}

public static object[] map(object[] list, IFunctor func)
{
    object[] returnValues = new object[list.length];

    for (int n = 0; n < list.length; n++)
        returnValues[n] = func.invoke(list[n]);

    return returnValues;
}

これは100万通りの方法で改善できると確信しています。しかし、それは基本的な考え方です。

削減は、リスト上のすべての項目を単一の値に変換する関数です。funcこれを行うには、 2つの項目を1つの値に変換する別の関数を指定する必要があります。最初の2つの項目をに与えることで機能しfuncます。次に、その結​​果と3番目の項目。次に、4番目の項目での結果、以下同様に、すべての項目がなくなり、1つの値が残るまで続きます。

C#ではreduceが呼び出されAggregate、再び標準ライブラリにあります。Javaバージョンに直接スキップします。

// represents a function that takes two args and returns a result
public interface IBinaryFunctor
{
    object invoke(object arg1, object arg2);
}

public static object reduce(object[] list, IBinaryFunctor func)
{
    if (list.length == 0)
        return null; // or throw something?

    if (list.length == 1)
        return list[0]; // just return the only item

    object returnValue = func.invoke(list[0], list[1]);

    for (int n = 1; n < list.length; n++)
        returnValue = func.invoke(returnValue, list[n]);

    return returnValue;
}

これらのJavaバージョンにはジェネリックスを追加する必要がありますが、Javaでそれを行う方法がわかりません。ただし、ファンクターを提供するために、匿名の内部クラスを渡すことができるはずです。

string[] names = getLotsOfNames();

string commaSeparatedNames = (string)reduce(names, 
   new IBinaryFunctor {
       public object invoke(object arg1, object arg2)
           { return ((string)arg1) + ", " + ((string)arg2); }
   }

うまくいけば、ジェネリックはキャストを取り除くでしょう。C#で同等のタイプセーフは次のとおりです。

string commaSeparatedNames = names.Aggregate((a, b) => a + ", " + b);

なぜこれが「かっこいい」のですか?大きな計算を小さな断片に分割して、さまざまな方法で元に戻すことができる簡単な方法は、常にクールです。Googleがこのアイデアを適用する方法は、並列化です。これは、mapとreduceの両方を複数のコンピューターで共有できるためです。

ただし、重要な要件は、言語が関数を値として処理できることではありません。どのオブジェクト指向言語でもそれを行うことができます。並列化の実際の要件は、funcmapおよびreduceに渡す小さな関数が、状態を使用または更新してはならないことです。渡された引数のみに依存する値を返す必要があります。そうしないと、すべてを並行して実行しようとすると、結果が完全に台無しになります。

于 2008-12-23T10:07:41.090 に答える
2

非常に長いワッフルまたは非常に短い漠然としたブログ投稿のいずれかに最も不満を感じた後、私は最終的にこの非常に優れた厳密で簡潔な論文を発見しました.

次に先に進み、Scala に変換してより簡潔にしました。ここでは、ユーザーが単にアプリケーションのmapreduceの部分を指定するだけの最も単純なケースを提供しました。Hadoop/Spark では、厳密に言えば、より複雑なプログラミング モデルが採用されており、ここで概説されている 4 つの関数をユーザーが明示的に指定する必要があります: http://en.wikipedia.org/wiki/MapReduce#Dataflow

import scalaz.syntax.id._

trait MapReduceModel {
  type MultiSet[T] = Iterable[T]

  // `map` must be a pure function
  def mapPhase[K1, K2, V1, V2](map: ((K1, V1)) => MultiSet[(K2, V2)])
                              (data: MultiSet[(K1, V1)]): MultiSet[(K2, V2)] = 
    data.flatMap(map)

  def shufflePhase[K2, V2](mappedData: MultiSet[(K2, V2)]): Map[K2, MultiSet[V2]] =
    mappedData.groupBy(_._1).mapValues(_.map(_._2))

  // `reduce` must be a monoid
  def reducePhase[K2, V2, V3](reduce: ((K2, MultiSet[V2])) => MultiSet[(K2, V3)])
                             (shuffledData: Map[K2, MultiSet[V2]]): MultiSet[V3] =
    shuffledData.flatMap(reduce).map(_._2)

  def mapReduce[K1, K2, V1, V2, V3](data: MultiSet[(K1, V1)])
                                   (map: ((K1, V1)) => MultiSet[(K2, V2)])
                                   (reduce: ((K2, MultiSet[V2])) => MultiSet[(K2, V3)]): MultiSet[V3] =
    mapPhase(map)(data) |> shufflePhase |> reducePhase(reduce)
}

// Kinda how MapReduce works in Hadoop and Spark except `.par` would ensure 1 element gets a process/thread on a cluster
// Furthermore, the splitting here won't enforce any kind of balance and is quite unnecessary anyway as one would expect
// it to already be splitted on HDFS - i.e. the filename would constitute K1
// The shuffle phase will also be parallelized, and use the same partition as the map phase.  
abstract class ParMapReduce(mapParNum: Int, reduceParNum: Int) extends MapReduceModel {
  def split[T](splitNum: Int)(data: MultiSet[T]): Set[MultiSet[T]]

  override def mapPhase[K1, K2, V1, V2](map: ((K1, V1)) => MultiSet[(K2, V2)])
                                       (data: MultiSet[(K1, V1)]): MultiSet[(K2, V2)] = {
    val groupedByKey = data.groupBy(_._1).map(_._2)
    groupedByKey.flatMap(split(mapParNum / groupedByKey.size + 1))
    .par.flatMap(_.map(map)).flatten.toList
  }

  override def reducePhase[K2, V2, V3](reduce: ((K2, MultiSet[V2])) => MultiSet[(K2, V3)])
                             (shuffledData: Map[K2, MultiSet[V2]]): MultiSet[V3] =
    shuffledData.map(g => split(reduceParNum / shuffledData.size + 1)(g._2).map((g._1, _)))
    .par.flatMap(_.map(reduce))
    .flatten.map(_._2).toList
}
于 2014-05-16T17:24:32.327 に答える