5

データの並列処理の簡単な実装に Ray を使用することを検討しています。

  • ストリーム/イテレータを介して利用可能になる処理対象のデータ項目が大量にあります。各アイテムはかなりのサイズです
  • 各アイテムに対して関数を実行する必要があり、かなりのサイズの結果が生成されます
  • 処理されたデータは、ストリームで渡されるか、一定期間内に一定量のデータのみを受け入れることができるある種のシンクに保存される必要があります

これがRayでできることかどうかを知りたいです。

現在、pythons マルチプロセッシング ライブラリに基づく次の簡単な実装があります。

  • 1 つのプロセスがストリームを読み取り、アイテムをキューに渡します。キューは k 個のアイテムの後でブロックされます (キューに必要なメモリが制限を超えないようにするため)。
  • 入力キューから読み取り、アイテムを処理するワーカー プロセスがいくつかあります。処理されたアイテムは結果キューに渡されますが、これもサイズが制限されています
  • 別のプロセスが結果キューを読み取り、アイテムを渡します

これにより、ワーカーがそれ以上アイテムを処理できなくなるとすぐに、キューがブロックされ、ワー​​カーにそれ以上の作業を渡そうとしなくなります。シンク プロセスがそれ以上アイテムを格納できない場合、結果キューがブロックされ、ワー​​カーがブロックされ、ライター プロセスが再び結果を書き込めるようになるまで入力キューがブロックされます。

では、Ray にはこのようなことを行うための抽象化がありますか? 特定の量の作業のみをワーカーに渡すことができるようにするにはどうすればよいですか? また、単一プロセスの出力関数のようなものを作成し、ワーカーがその関数にメモリ/ストレージは使い果たされていますか?

4

2 に答える 2

0

このユースケースでは、Ray の並列反復子をお勧めします。最初に、ストリーミング ジェネレーターから大きなオブジェクトを取得するジェネレーターを作成し (「参考文献」を参照ray.util.iter.from_iterators())、それらの項目に対する操作を連鎖させます (「参考文献」を参照.for_each())。重要なことに、中間オブジェクト (それ自体が大きい可能性があります) は、チェーン内の次の関数によって消費されるとすぐにメモリから削除されるため、メモリ不足を防ぐことができます。

最後に、メソッドで必要に応じてデータ シンクの準備が整うまで、キューでの実行を制御できます.take()

于 2020-07-31T17:53:45.853 に答える