.NET で同時実行がどのように機能するかを理解していないことで、再び恥ずかしい思いをする時が来ました :P
asyncSeq 入力を受け取り、それを n 個の並列コンシューマーに配布する非同期ワークフローの作成をカプセル化できる関数を作成しようとしています。
let doWorkInParallel bufferSize numberOfConsumers consumer input = async {
let buffer = BlockingQueueAgent<_>(bufferSize)
let inputFinished = ref false
let produce seq = async {
do! seq |> AsyncSeq.iterAsync (Some >> buffer.AsyncAdd)
do! buffer.AsyncAdd None
}
let consumeParallel degree f = async {
let consume f = async {
while not <| !inputFinished do
try
let! data = buffer.AsyncGet(waitTimeout)
match data with
| Some i -> do! f i
// whichever consumer gets the end of the input flips the inputFinished ref so they'll all stop processing
| None -> inputFinished := true
with | :? TimeoutException -> ()
}
do! [for slot in 1 .. degree -> consume f ]
|> Async.Parallel
|> Async.Ignore
}
let startProducer = produce input
let startConsumers = consumeParallel numberOfConsumers consumer
return! [| startProducer; startConsumers |] |> Async.Parallel |> Async.Ignore
}
そして、このようなコードでテストして、作成が速いシーケンスをシミュレートしましたが、各アイテムの作業には時間がかかります.
let input = seq {1..50} |> AsyncSeq.ofSeq
let action i = async {
do! Async.Sleep 500
printfn "%d GET" i
}
let test = doWorkInParallel 10 2 action input
Async.RunSynchronously test
EDIT:私は最初の問題を修正したようです(ええと、EventWaitHandleで何をしていたのか本当に考えていませんでした. F#で。