1

次のような状況があります。

let mutable stopped = false

let runAsync() = async {
    while not stopped do
        let! item = fetchItemToProcessAsync
        match item with
        | Some job -> job |> runJobAsync |> Async.Start
        | None -> do! Async.Sleep(1000)
}

let run() = Async.Start runAsync
let stop() =
    stopped <- true

stop メソッドが呼び出されると、DB からのそれ以上の項目の読み取りを停止し、現在開始されている項目が終了するのを待ってから、この関数から戻る必要があります。

これを達成するための最良の方法は何ですか?私はカウンターを使用して(連動APIを使用)、カウンターが0に達したときにstopメソッドから戻ることを考えていました。

これを達成する別の方法がある場合は、ガイダンスをいただければ幸いです。ここでエージェントを使用できると思いますが、エージェントを使用してこれを実現する方法があるかどうか、またはジョブの実行が完了したことを判断するためにカスタム ロジックを作成する必要があるかどうかはわかりません。

4

2 に答える 2

1

fssnip.net でこのスニペットを確認してください。これは、使用できる汎用ジョブ プロセッサです。

于 2011-08-18T13:07:21.647 に答える
1

アクターベースのパターンと MailboxProcessor を見てみましょう

基本的に、それは非同期キューとして想像できます。実行中のリスト ( Async.StartChildまたはAsync.StartAsTaskで開始) を MailboxProcessor 内のループのパラメーターとして使用すると、待機または CancellationToken を介してシャットダウンを適切に処理できます)。

ここに私がまとめた簡単なサンプルがあります:


type Commands = 
    | RunJob of Async
    | JobDone of int
    | Quit of AsyncReplyChannel

type JobRunner() =
    let processor =
        MailboxProcessor.Start (fun inbox ->
            let rec loop (nextId, jobs) = async {
                let! cmd = inbox.Receive()
                match cmd with
                | Quit cb ->
                    if not (Map.isEmpty jobs) 
                    then async {
                            do! Async.Sleep 100
                            inbox.Post (Quit cb)}
                        |> Async.Start
                        return! loop (nextId, jobs)
                    else 
                        cb.Reply()
                        return ()
                | JobDone id ->
                    return! loop (nextId, jobs |> Map.remove id)
                | RunJob job ->
                    let runJob i = async {
                        do! job
                        inbox.Post (JobDone i)
                    }
                    let! child = Async.StartChild (runJob nextId)
                    return! loop (nextId+1, jobs |> Map.add nextId child)
            }
            loop (0, Map.empty))
    member jr.PostJob(job) = processor.Post (RunJob job)
    member jr.Quit() = processor.PostAndReply(fun cb -> Quit cb)

let postWaitJob (jobRunner : JobRunner) time =
    let job = async {
        do! Async.Sleep time
        printfn "sleept for %d ms" time }
    jobRunner.PostJob job

let testRun() =
    let jr = new JobRunner()
    printfn "starting jobs..."
    [10..-1..1] |> List.iter (fun i -> postWaitJob jr (i*1000))
    printfn "sending quit"
    jr.Quit()
    printfn "done!"

うーん...ここでエディターにいくつか問題があります:パイプバック演算子を使用すると、多くのコードが殺されます...grrr

簡単な説明: ご覧のとおり、次の空きジョブ ID と Id->AsyncChild ジョブのマップを内側のループに常に提供します。(もちろん、他の/より良い解決策を実装できます。この例ではマップは必要ありませんが、コマンド「Cancell JobNr」またはこの方法で拡張できます) Job done メッセージは、このマップからジョブを削除するために内部的にのみ使用されます Quitマップが空かどうかを確認するだけです - 追加の作業が必要ない場合は、Mailboxprocessor が終了します (return ()) - 空でない場合は、新しい Async-Child が開始され、100 ミリ秒待機してから Quit-Message RunJob を再送信しますかなり単純です - 指定されたジョブを JobDone の投稿で MessabeboxProcessor にチェーンし、更新された値で再帰呼び出しループを行うだけです (nextId は 1 つ上にあり、新しい Job は古い nextId にマップされます)。

于 2011-08-18T06:45:21.287 に答える