4

2 つの関数f: String => Ag: A => Bを大きなテキスト ファイルの各行に適用して、最終的にB.

ファイルが大きくfg高価なので、処理を並行させたいと思います。「並列コレクション」を使用して次のようなことを行うことができますが、ファイルの読み取り、、および同時io.Source.fromFile("data.txt").getLines.toList.par.map(l => g(f(l))実行は実行されません。fg

この例で同時実行を実装する最良の方法は何ですか?

4

2 に答える 2

14

最初に、重要な注意:すべてのデータをコピーする必要があるため、.paron を使用しないでください (シーケンシャルにしか読み取れないため)。代わりに、コピーせずに変換できるのようなものを使用してください。ListListVector.par

並列処理を間違った方法で考えているようです。何が起こるかは次のとおりです。

次のようなファイルがある場合:

0
1
2
3
4
5
6
7
8
9

そして機能fg

def f(line: String) = {
  println("running f(%s)".format(line))
  line.toInt
}

def g(n: Int) = {
  println("running g(%d)".format(n))
  n + 1
}

次に、次のことができます。

io.Source.fromFile("data.txt").getLines.toIndexedSeq[String].par.map(l => g(f(l)))

出力を取得します。

running f(3)
running f(0)
running f(5)
running f(2)
running f(6)
running f(1)
running g(2)
running f(4)
running f(7)
running g(4)
running g(1)
running g(6)
running g(3)
running g(5)
running g(0)
running g(7)
running f(9)
running f(8)
running g(9)
running g(8)

したがって、g(f(l))操作全体が同じスレッドで行われている場合でも、各行が並行して処理される可能性があることがわかります。したがって、多くのfandg操作が別々のスレッドで同時に発生する可能性がありますが、特定の行fのandは順番に発生します。g

結局のところ、行を読み取り、fを実行gし、並行して実行する方法は実際には存在しないため、これは想定どおりの方法です。たとえば、行がまだ読み取られていない場合、g出力に対してどのように実行できますか?f

于 2012-12-12T15:52:41.013 に答える
3

mapで使用できますFuture

val futures = io.Source.fromFile(fileName).getLines.map{ s => Future{ stringToA(s) }.map{ aToB } }.toIndexedSeq

val results = futures.map{ Await.result(_, 10 seconds) }
// alternatively:
val results = Await.result(Future.sequence(futures), 10 seconds)
于 2012-12-12T15:51:16.797 に答える