私はApache Sparkについて話すことができます。以下のコードを使用して、探していることを実行できます。ただし、この種の並列計算用には設計されていません。これは、多数のマシンに大量の並列データが分散されている場合の並列計算用に設計されています。したがって、ソリューションは少しばかげているように見えます。たとえば、単一のマシンに単一の整数を分散させるためです ( for f(1)
)。
また、Spark はすべてのデータに対して同じ計算を実行するように設計されています。そのため、実行g1()
とg2()
並行は設計に少し反します。(ご覧のとおり、可能ですが、エレガントではありません。)
// Distribute the input (1) across 1 machine.
val rdd1 = sc.parallelize(Seq(1), numSlices = 1)
// Run f() on the input, collect the results and take the first (and only) result.
val fx = rdd1.map(f(_)).collect.head
// The next stage's input will be (1, fx), (2, fx) distributed across 2 machines.
val rdd2 = sc.parallelize(Seq((1, fx), (2, fx)), numSlices = 2)
// Run g1() on one machine, g2() on the other.
val gxs = rdd2.map {
case (1, x) => g1(x)
case (2, x) => g2(x)
}.collect
val g1x = gxs(0)
val g2x = gxs(1)
// Same deal for h() as for f(). The input is (g1x, g2x), distributed to 1 machine.
val rdd3 = sc.parallelize(Seq((g1x, g2x)), numSlices = 1)
val res = rdd3.map { case (g1x, g2x) => h(g1x, g2x) }.collect.head
Spark コードがRDDの概念に基づいていることがわかります。RDD は配列に似ていますが、複数のマシンに分割されている点が異なります。sc.parallelize()
ローカル コレクションからそのような並列コレクションを作成します。たとえばrdd2
、上記のコードはローカル コレクションから作成され、Seq((1, fx), (2, fx))
2 台のマシンに分割されます。1 台のマシンには がありSeq((1, fx))
、もう 1 台には がありますSeq((2, fx))
。
次に、RDDで変換を行います。map
各要素に関数を適用して同じ長さの新しい RDD を作成する一般的な変換です。(Scala の と同じmap
です。)map
実行するはandにrdd2
置き換え(1, x)
られます。したがって、一方のマシンでは実行され、もう一方のマシンでは実行されます。g1(x)
(2, x)
g2(x)
g1()
g2()
変換は、結果にアクセスする必要がある場合にのみ遅延して実行されます。結果にアクセスするメソッドはアクションと呼ばれます。最も単純な例はcollect
、RDD 全体の内容をクラスターからローカル マシンにダウンロードする です。(とは正反対ですsc.parallelize()
。)
bin/spark-shell
Spark をダウンロードして startを実行し、関数定義と上記のコードをシェルにコピーすると、これらすべてを試すことができます。