4

私の状況では、計算のリストが大きすぎてスレッドが多すぎて生成されるため、基本的に、以下を制限されたスレッドソリューションに変更したいと考えています。より少ないスレッドでパフォーマンスを実験して測定したいと考えています。

// the trivial approach (and largely my current situation)
let doWork() = 
    [1 .. 10]
    |> List.map (fun i -> async { 
        do! Async.Sleep (100 * i)   // longest thread will run 1 sec
        return i * i                // some complex calculation returning a certain type
        })
    |> Async.Parallel
    |> Async.RunSynchronously       // works, total wall time 1s

私の新しいアプローチであるこのコードは、Tomas Petricek からのこのオンライン スニペットから借用/着想を得ています(私がテストしたところ、動作しますが、単位ではなく値を返す必要があります)。

type LimitAgentMessage = 
  | Start of Async<int> *  AsyncReplyChannel<int>
  | Finished

let threadingLimitAgent limit = MailboxProcessor.Start(fun inbox -> async {

    let queue = System.Collections.Generic.Queue<_>()
    let count = ref 0
    while true do
        let! msg = inbox.Receive() 
        match msg with 
        | Start (work, reply) -> queue.Enqueue((work, reply))
        | Finished -> decr count
        if count.Value < limit && queue.Count > 0 then
            incr count
            let work, reply = queue.Dequeue()
            // Start it in a thread pool (on background)
            Async.Start(async { 
                let! x = work 
                do! async {reply.Reply x }
                inbox.Post(Finished) 
            }) 
  })


// given a synchronous list of tasks, run each task asynchronously, 
// return calculated values in original order
let worker lst = 
    // this doesn't work as expected, it waits for each reply
    let agent = threadingLimitAgent 10
    lst 
    |> List.map(fun x ->            
        agent.PostAndReply(
            fun replyChannel -> Start(x, replyChannel)))

これで、元のコードは次のようになります。

let doWork() = 
    [1 .. 10]
    |> List.map (fun i -> async { 
        do! Async.Sleep (100 * i)   // longest thread will run 1 sec
        return i * i                // some complex calculation returning a certain type
        })
    |> worker       // worker is not working (correct output, runs 5.5s)

全体として、出力は正しい (応答を計算て伝搬する) が、(限られたセットの) スレッドでは正しくない。

私は少し遊んでいましたが、明らかなことを見逃していると思います (さらに、誰かが、計算を順番に返す制限されたスレッドのメールボックス プロセッサのアイデアを好むかもしれません)。

4

1 に答える 1

5

問題は への呼び出しagent.PostAndReplyです。PostAndReply作業が完了するまでブロックします。これを内部で呼び出すList.mapと、作業が順次実行されます。PostAndAsyncReply1 つの解決策は、ブロックせず、結果を取得するための非同期ハンドルを返す whichを使用することです。

let worker lst = 
    let agent = threadingLimitAgent 10
    lst 
    |> List.map(fun x ->            
        agent.PostAndAsyncReply(
            fun replyChannel -> Start(x, replyChannel)))
    |> Async.Parallel

let doWork() = 
    [1 .. 10]
    |> List.map (fun i -> async { 
        do! Async.Sleep (100 * i)  
        return i * i               
        })
    |> worker      
    |> Async.RunSynchronously

もちろん、これは可能な解決策の 1 つにすぎません (すべての非同期ハンドルを取得し、それらを並行して待機する)。

于 2016-09-04T16:19:27.320 に答える