13

問題

ここで説明するコードは F# で作成しましたが、.NET 4 フレームワークに基づいており、特に F# の特殊性に依存しているわけではありません (少なくともそう思われます!)。

ディスクにいくつかのデータがあり、ネットワークから更新して、最新バージョンをディスクに保存する必要があります。

type MyData =
    { field1 : int;
      field2 : float }

type MyDataGroup =
    { Data : MyData[];
      Id : int }

// load : int -> MyDataGroup
let load dataId =
    let data = ... // reads from disk
    { Data = data;
      Id = dataId }

// update : MyDataGroup -> MyDataGroup
let update dg =
    let newData = ... // reads from the network and process
                      // newData : MyData[]

    { dg with Data = dg.Data
                     |> Seq.ofArray
                     |> Seq.append newData
                     |> processDataSomehow
                     |> Seq.toArray }

// save : MyDataGroup -> unit
let save dg = ... // writes to the disk

let loadAndSaveAndUpdate = load >> update >> save

問題は、loadAndSaveAndUpdateすべてのデータに対して、関数を度も実行する必要があることです。

{1 .. 5000} |> loadAndSaveAndUpdate

各ステップは

  • いくらかのディスク IO、
  • いくつかのデータクランチ、
  • いくらかのネットワーク IO (多くの遅延が発生する可能性あり)、
  • より多くのデータ処理、
  • およびいくつかのディスク IO。

これをある程度並行して行うとよいのではないでしょうか。残念ながら、私の読み取り機能と解析機能はどれも「非同期ワークフロー対応」ではありません。

私が思いついた最初の(あまり良くない)解決策

タスク

私が最初にしたことは、セットアップしてTask[]それらすべてを開始することでした:

let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
            |> Seq.map createTask
            |> Seq.toArray

tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)

次に、CTRL + ESC を押して、使用しているスレッドの数を確認しました。15、17、...、35、...、170、... アプリケーションを強制終了するまで! 何かがうまくいかなかった。

平行

私はほとんど同じことをしましたが、使用Parallel.ForEach(...)して結果は同じでした: たくさんのスレッドとたくさんのスレッド。

機能するソリューション...一種の

次に、使用可能なタスクがなくなるまで、nスレッドのみを開始しTask.WaitAll(of them)、次にその他のスレッドを開始することにしました。n

これは機能しますが、問題は、たとえばn-1タスクの処理が完了すると、多くのネットワーク遅延のためにブロックを主張する最後のタスクを待機、待機、待機することです。これは良くない!

では、この問題にどのように対処しますか? 非同期ワークフロー (およびこの場合、非同期関数を適応させる方法)、並列拡張、奇妙な並列パターンなどを含むさまざまなソリューションを表示していただければ幸いです。

ありがとう。

4

4 に答える 4

12

ParallelOptions.MaxDegreeOfParallelism は、Parallel メソッド呼び出しによって実行される同時操作の数を制限します

于 2010-05-26T18:42:08.123 に答える
10

「非同期」を使用すると、さまざまな I/O 呼び出しが「海上」にあるときに、スレッドを燃やすことなく I/O バウンドの作業を行うことができるので、それが私の最初の提案です。通常、次の行に沿って、コードを非同期に変換するのは簡単です。

  • 各関数本体を でラップし、必要に応じasync{...}て追加しますreturn
  • を介して、まだライブラリにない I/O プリミティブの非同期バージョンを作成します。Async.FromBeginEnd
  • フォームの呼び出しを次のようlet r = Foo()に切り替えますlet! r = AsyncFoo()
  • Async.Parallel5000 個の非同期オブジェクトを、並行して実行される単一の非同期オブジェクトに変換するために使用します

これを行うためのさまざまなチュートリアルがあります。そのような Web キャストの 1 つがここにあります

于 2010-01-04T23:14:49.520 に答える
7

個々のタスクがタイムリーに完了していると確信していますか? 両方Parallel.ForEachTaskクラスが既に .NET スレッドプールを使用していると思います。タスクは通常、存続期間の短い作業項目である必要があります。その場合、スレッドプールは少数の実際のスレッドのみを生成しますが、タスクが進行しておらず、他のタスクがキューに入れられている場合、使用されるスレッドの数は着実に増加します。最大 (デフォルトでは250/プロセッサー).NET 2.0 SP1 では異なりますが、フレームワークのバージョンによって異なります)。また、(少なくとも .NET 2.0 SP1 では) 新しいスレッドの作成が 1 秒あたり 2 つの新しいスレッドに抑制されていることにも注意してください。時間(そのため、責任を特定するのは完全に正確ではない場合がありますParallel.ForEach)。

ワークフローを使用するという Brian の提案は、IO が完了するまでスレッドをスレッドプールに戻すasyncため、特に長命のタスクのソースが IO である場合に適していると思います。async別のオプションは、タスクがすぐに完了しないことを単純に受け入れ、多くのスレッドの生成を許可することです (これは を使用してある程度制御できますSystem.Threading.ThreadPool.SetMaxThreads) - 状況によっては、使用していることが大したことではない場合がありますたくさんのスレッド。

于 2010-01-07T23:24:18.710 に答える
0

いつでも使用できますThreadPool

http://msdn.microsoft.com/en-us/library/system.threading.threadpool.aspx

基本的:

  1. スレッド プールを作成する
  2. スレッドの最大数を設定する
  3. を使用してすべてのタスクをキューに入れますQueueUserWorkItem(WaitCallback)
于 2010-01-04T23:23:32.937 に答える