F#には次のコードがあります。これは、マシンの4つのコアを利用するのに十分な並行性があると考えられます。ただし、CPU使用率は1つのコアに制限されています。
member x.Solve problemDef =
use flag = new ManualResetEventSlim(false)
let foundSoFar = MSet<'T>()
let workPile = MailboxProcessor<seq<'T>>.Start(fun inbox ->
let remaining = ref 0
let rec loop() = async {
let! data = inbox.Receive()
let data = data |> Seq.filter (not << foundSoFar.Contains) |> Array.ofSeq
foundSoFar.UnionWith data
let jobs = ref -1
for chunk in data |> Seq.distinct |> Seq.chunked 5000 do
Async.Start <| async {
Seq.collect problemDef.generators chunk
|> Array.ofSeq
|> inbox.Post
}
incr jobs
remaining := !remaining + !jobs
if (!remaining = 0 && !jobs = -1) then
flag.Set() |> ignore
else
return! loop()
}
loop()
)
workPile.Post problemDef.initData
flag.Wait() |> ignore
foundSoFar :> seq<_>
MailboxProcessorをワークパイルとして使用し、そこから要素のチャンクを取得し、HashSetでフィルター処理して、結果がワークパイルに挿入される新しい要素を使用してタスクを作成します。これは、新しい要素が生成されなくなるまで繰り返されます。このコードの目的は、チャンクをワークパイルに非同期的に挿入し、タスクを使用することです。私の問題は、並列処理がないことです。
編集:@ jon-harropのおかげで、seqの怠惰な性質に起因する並行性の問題を解決し、提案に従ってコードを書き直しました。エージェントのメッセージタイプとして識別された共用体を使用せずに(要求メッセージをサポートするために)ManualResetEventを取り除く方法はありますか?