3

特定の順序で実行する必要がIterableある「作業単位」があり、互いに干渉することなく簡単に並行して実行できます。

残念なことに、一度に実行しすぎると、使用可能な RAM を超えるので、常に少数しか同時に実行しないようにする必要があります。

最も基本的には、この型シグネチャの関数が必要です。

parMap[A, B](xs: Iterator[A], f: A => B, chunkSize: Int): Iterator[B]

そのため、出力Iteratorは必ずしも入力と同じ順序であるとは限りません (結果がどこから来たかの知識を維持したい場合は、入力または何かとのペアを出力できます)このタスクで可能な限り多くの並列処理を維持しながら、マシンのすべてのメモリを使い果たします。

さらに、機能をできるだけ効率的にしたいと考えています。私が最初に思いついたのは、たとえば次のようなことをすることでした。

xs.iterator.grouped(chunkSize).flatMap(_.toSet.par.map(f).iterator)

toSetここで、 Scala の並列コレクションに、イテレータから要素の準備が整うとすぐに任意の順序で要素の生成を開始できることを通知し、grouped呼び出しは同時ワーカーの数を制限することを望んでいました。残念ながら、呼び出しが望ましい効果を達成しているようには見えません(私の実験では、呼び出しtoSetがなかった場合と同じ順序で結果が返されます)。pargrouped呼び出しは最適ではありません。たとえば、グループ サイズが 100 で、そのうちの 99 個のジョブが 12 個のコアですぐに完了するが、そのうちの 1 つが特に遅い場合、残りのコアのほとんどは、次のグループに移動できるまでアイドル状態になります。最大で私のチャンクサイズと同じ大きさの「適応ウィンドウ」を持つ方がはるかにきれいですが、遅いワーカーに遅れることはありません。

ワークスティーリング (デ) キューなどを使用して、このようなものを自分で作成することを想像できますが、並行プリミティブを処理するための多くの困難な作業は、Scala の並列処理のあるレベルで既に行われていると思います。コレクション ライブラリ。この機能を構築するために再利用できる部分を知っている人はいますか、またはそのような操作を実装する方法について他の提案がありますか?

4

1 に答える 1

3

並列コレクション フレームワークを使用すると、特定のタスクに使用するスレッドの最大数を指定できます。scala-2.10 を使用すると、次のようになります。

def parMap[A,B](x : Iterable[A], f : A => B, chunkSize : Int) = {
  val px = x.par
  px.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(chunkSize))
  px map f
}

これにより、一度に複数のchunkSize操作が実行されるのを防ぐことができます。これは、下でワークスティーリング戦略を使用してアクターを機能させ続けるため、上記の例と同じ問題に悩まされることはありませんgrouped

ただし、この方法では、結果が最初に完了した順序に並べ替えられません。そのためには、操作をアクターに変えて、小さなアクター プールで操作を実行し、操作が完了したら結果を返すような方法をお勧めします。

于 2013-02-06T15:50:57.293 に答える