問題
ここで説明するコードは F# で作成しましたが、.NET 4 フレームワークに基づいており、特に F# の特殊性に依存しているわけではありません (少なくともそう思われます!)。
ディスクにいくつかのデータがあり、ネットワークから更新して、最新バージョンをディスクに保存する必要があります。
type MyData =
{ field1 : int;
field2 : float }
type MyDataGroup =
{ Data : MyData[];
Id : int }
// load : int -> MyDataGroup
let load dataId =
let data = ... // reads from disk
{ Data = data;
Id = dataId }
// update : MyDataGroup -> MyDataGroup
let update dg =
let newData = ... // reads from the network and process
// newData : MyData[]
{ dg with Data = dg.Data
|> Seq.ofArray
|> Seq.append newData
|> processDataSomehow
|> Seq.toArray }
// save : MyDataGroup -> unit
let save dg = ... // writes to the disk
let loadAndSaveAndUpdate = load >> update >> save
問題は、loadAndSaveAndUpdate
すべてのデータに対して、関数を何度も実行する必要があることです。
{1 .. 5000} |> loadAndSaveAndUpdate
各ステップは
- いくらかのディスク IO、
- いくつかのデータクランチ、
- いくらかのネットワーク IO (多くの遅延が発生する可能性あり)、
- より多くのデータ処理、
- およびいくつかのディスク IO。
これをある程度並行して行うとよいのではないでしょうか。残念ながら、私の読み取り機能と解析機能はどれも「非同期ワークフロー対応」ではありません。
私が思いついた最初の(あまり良くない)解決策
タスク
私が最初にしたことは、セットアップしてTask[]
それらすべてを開始することでした:
let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
|> Seq.map createTask
|> Seq.toArray
tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)
次に、CTRL + ESC を押して、使用しているスレッドの数を確認しました。15、17、...、35、...、170、... アプリケーションを強制終了するまで! 何かがうまくいかなかった。
平行
私はほとんど同じことをしましたが、使用Parallel.ForEach(...)
して結果は同じでした: たくさんのスレッドとたくさんのスレッド。
機能するソリューション...一種の
次に、使用可能なタスクがなくなるまで、n
スレッドのみを開始しTask.WaitAll(of them)
、次にその他のスレッドを開始することにしました。n
これは機能しますが、問題は、たとえばn-1
タスクの処理が完了すると、多くのネットワーク遅延のためにブロックを主張する最後のタスクを待機、待機、待機することです。これは良くない!
では、この問題にどのように対処しますか? 非同期ワークフロー (およびこの場合、非同期関数を適応させる方法)、並列拡張、奇妙な並列パターンなどを含むさまざまなソリューションを表示していただければ幸いです。
ありがとう。