0

次の関数オブジェクトが与えられると、

val f : Int => Double = (i:Int) => i + 0.1

val g1 : Double => Double = (x:Double) => x*10

val g2 : Double => Double = (x:Double) => x/10

val h : (Double,Double) => Double = (x:Double,y:Double) => x+y

たとえば、3 つのリモート サーバーまたはノード (IP xxx.xxx.xxx.1、IP 2 および IP 3)、このプログラムの実行を分散する方法、

val fx = f(1)
val g1x = g1( fx )
val g2x = g2( fx )
val res = h ( g1x, g2x )

となることによって

  • fxIP 1 で計算されます。
  • g1xIP 2 で計算されます。
  • g2xIP 3 で計算されます。
  • resIP 1 で計算されます

Scala Akka や Apache Spark は、これに対する簡単なアプローチを提供できますか?

アップデート

  • @pkinsky によって提案されたRPC (リモート プロシージャ コール) Finagleは、実行可能な選択かもしれません。
  • ロード バランシング ポリシーは、実行するノードを選択するためのメカニズムであると考えてください
4

1 に答える 1

1

私はApache Sparkについて話すことができます。以下のコードを使用して、探していることを実行できます。ただし、この種の並列計算用には設計されていません。これは、多数のマシンに大量の並列データが分散されている場合の並列計算用に設計されています。したがって、ソリューションは少しばかげているように見えます。たとえば、単一のマシンに単一の整数を分散させるためです ( for f(1))。

また、Spark はすべてのデータに対して同じ計算を実行するように設計されています。そのため、実行g1()g2()並行は設計に少し反します。(ご覧のとおり、可能ですが、エレガントではありません。)

// Distribute the input (1) across 1 machine.
val rdd1 = sc.parallelize(Seq(1), numSlices = 1)
// Run f() on the input, collect the results and take the first (and only) result.
val fx = rdd1.map(f(_)).collect.head
// The next stage's input will be (1, fx), (2, fx) distributed across 2 machines.
val rdd2 = sc.parallelize(Seq((1, fx), (2, fx)), numSlices = 2)
// Run g1() on one machine, g2() on the other.
val gxs = rdd2.map {
  case (1, x) => g1(x)
  case (2, x) => g2(x)
}.collect
val g1x = gxs(0)
val g2x = gxs(1)
// Same deal for h() as for f(). The input is (g1x, g2x), distributed to 1 machine.
val rdd3 = sc.parallelize(Seq((g1x, g2x)), numSlices = 1)
val res = rdd3.map { case (g1x, g2x) => h(g1x, g2x) }.collect.head

Spark コードがRDDの概念に基づいていることがわかります。RDD は配列に似ていますが、複数のマシンに分割されている点が異なります。sc.parallelize()ローカル コレクションからそのような並列コレクションを作成します。たとえばrdd2、上記のコードはローカル コレクションから作成され、Seq((1, fx), (2, fx))2 台のマシンに分割されます。1 台のマシンには がありSeq((1, fx))、もう 1 台には がありますSeq((2, fx))

次に、RDDで変換を行います。map各要素に関数を適用して同じ長さの新しい RDD を作成する一般的な変換です。(Scala の と同じmapです。)map実行するはandにrdd2置き換え(1, x)られます。したがって、一方のマシンでは実行され、もう一方のマシンでは実行されます。g1(x)(2, x)g2(x)g1()g2()

変換は、結果にアクセスする必要がある場合にのみ遅延して実行されます。結果にアクセスするメソッドはアクションと呼ばれます。最も単純な例はcollect、RDD 全体の内容をクラスターからローカル マシンにダウンロードする です。(とは正反対ですsc.parallelize()。)

bin/spark-shellSpark をダウンロードして startを実行し、関数定義と上記のコードをシェルにコピーすると、これらすべてを試すことができます。

于 2014-11-04T12:42:54.973 に答える