0

F#には次のコードがあります。これは、マシンの4つのコアを利用するのに十分な並行性があると考えられます。ただし、CPU使用率は1つのコアに制限されています。

    member x.Solve problemDef =
        use flag = new ManualResetEventSlim(false)
        let foundSoFar = MSet<'T>()
        let workPile = MailboxProcessor<seq<'T>>.Start(fun inbox ->
            let remaining = ref 0
            let rec loop() = async {
                let! data = inbox.Receive()
                let data = data |> Seq.filter (not << foundSoFar.Contains) |> Array.ofSeq
                foundSoFar.UnionWith data
                let jobs = ref -1
                for chunk in data |> Seq.distinct |> Seq.chunked 5000 do
                    Async.Start <| async {
                        Seq.collect problemDef.generators chunk
                        |> Array.ofSeq
                        |> inbox.Post
                    }
                    incr jobs
                remaining := !remaining + !jobs
                if (!remaining = 0 && !jobs = -1) then
                    flag.Set() |> ignore
                else 
                    return! loop()
            }
            loop()
        )
        workPile.Post problemDef.initData
        flag.Wait() |> ignore
        foundSoFar :> seq<_>

MailboxProcessorをワークパイルとして使用し、そこから要素のチャンクを取得し、HashSetでフィルター処理して、結果がワークパイルに挿入される新しい要素を使用してタスクを作成します。これは、新しい要素が生成されなくなるまで繰り返されます。このコードの目的は、チャンクをワークパイルに非同期的に挿入し、タスクを使用することです。私の問題は、並列処理がないことです。

編集:@ jon-harropのおかげで、seqの怠惰な性質に起因する並行性の問題を解決し、提案に従ってコードを書き直しました。エージェントのメッセージタイプとして識別された共用体を使用せずに(要求メッセージをサポートするために)ManualResetEventを取り除く方法はありますか?

4

2 に答える 2

2

完全な例がないと、コードが何をするのかを理解するのが非常に難しいことがわかりました (おそらく、かなりの数の異なる並行プログラミング プリミティブを組み合わせているため、理解するのが少し難しくなっているからでしょう)。

いずれにせよ、 の本体はMailboxProcessor1 回だけ実行されます (単純なエージェントを使用して並行性を確保したい場合は、複数のエージェントを開始する必要があります)。problemDef.generatorsエージェントの本体で、各に対して実行されるタスクを開始しますchunk

これはproblemDef.generators、並行して実行する必要があることを意味します。foundSoFar.Containsただし、 and とfoundSoFar.UnionWithasを呼び出すコードSeq.distinctは、常に順番に実行されます。

したがって、problemDef.generatorsが単純で効率的な関数である場合、トラッキングfoundSoFar(順次実行される) のオーバーヘッドは、おそらく並列化によって得られるものよりも大きくなります。

私は に慣れていませんが、それがスレッドセーフな可変セットである場合 (またはそれを置き換えた場合)、ユニオンの一部を(他のユニオンと並行して) でMSet<'T>実行できるはずです。Task.StartNew

PS: 先ほど言ったように、コードを実行しないとわかりにくいので、私の考えは完全に間違っている可能性があります。

于 2013-01-07T16:18:25.490 に答える
1

ManualResetEventSlim 非常に悪い高レベルの同時実行プリミティブ (タスクとエージェント) を混在させています。代わりに使えますPostAndReplyか?

生成されたタスクで「作業」を行うために使用Seqしているため、遅延が発生するため、ポストバックされるまで実際には何もしません。のようなものでタスク内で評価を強制できますArray.ofSeqか?

使い方Taskが異常です。に切り替える方が慣用的かもしれませんAsync.Start

完全な解決策がなければ、私の推測を検証することはできません...

4つのコアを利用するのに十分な同時性があると思います

マルチコア並列処理に関するあなたのメンタル モデルは、かなり的外れかもしれません。

于 2013-01-08T08:47:05.700 に答える