特定の順序で実行する必要がIterable
ある「作業単位」があり、互いに干渉することなく簡単に並行して実行できます。
残念なことに、一度に実行しすぎると、使用可能な RAM を超えるので、常に少数しか同時に実行しないようにする必要があります。
最も基本的には、この型シグネチャの関数が必要です。
parMap[A, B](xs: Iterator[A], f: A => B, chunkSize: Int): Iterator[B]
そのため、出力Iterator
は必ずしも入力と同じ順序であるとは限りません (結果がどこから来たかの知識を維持したい場合は、入力または何かとのペアを出力できます)このタスクで可能な限り多くの並列処理を維持しながら、マシンのすべてのメモリを使い果たします。
さらに、機能をできるだけ効率的にしたいと考えています。私が最初に思いついたのは、たとえば次のようなことをすることでした。
xs.iterator.grouped(chunkSize).flatMap(_.toSet.par.map(f).iterator)
toSet
ここで、 Scala の並列コレクションに、イテレータから要素の準備が整うとすぐに任意の順序で要素の生成を開始できることを通知し、grouped
呼び出しは同時ワーカーの数を制限することを望んでいました。残念ながら、呼び出しが望ましい効果を達成しているようには見えません(私の実験では、呼び出しtoSet
がなかった場合と同じ順序で結果が返されます)。par
grouped
呼び出しは最適ではありません。たとえば、グループ サイズが 100 で、そのうちの 99 個のジョブが 12 個のコアですぐに完了するが、そのうちの 1 つが特に遅い場合、残りのコアのほとんどは、次のグループに移動できるまでアイドル状態になります。最大で私のチャンクサイズと同じ大きさの「適応ウィンドウ」を持つ方がはるかにきれいですが、遅いワーカーに遅れることはありません。
ワークスティーリング (デ) キューなどを使用して、このようなものを自分で作成することを想像できますが、並行プリミティブを処理するための多くの困難な作業は、Scala の並列処理のあるレベルで既に行われていると思います。コレクション ライブラリ。この機能を構築するために再利用できる部分を知っている人はいますか、またはそのような操作を実装する方法について他の提案がありますか?