5

の使い方の例を見つけようとしTryScanましたが、何も見つかりませんでした。

私がやりたいこと (非常に単純化された例): MailboxProcessor2 種類のメッセージを受け入れる があります。

  • 最初のものGetStateは現在の状態を返します。 GetStateメッセージはかなり頻繁に送信されます

  • もう 1 つUpdateStateは非常にコストがかかります (時間がかかります)。たとえば、インターネットから何かをダウンロードし、それに応じて状態を更新します。 UpdateStateめったに呼び出されません。

私の問題は-メッセージGetStateがブロックされ、前のメッセージが提供されるまで待機することUpdateStateです。そのため、TryScanすべてのGetStateメッセージを処理するために使用しようとしましたが、うまくいきませんでした。

私のコード例:

type Msg = GetState  of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
             let rec loop state = async {
                // this TryScan doesn't work as expected
                // it should process GetState messages and then continue
                mbox.TryScan(fun m ->
                    match m with 
                    | GetState(chnl) -> 
                        printfn "G processing TryScan"
                        chnl.Reply(state)
                        Some(async { return! loop state})
                    | _ -> None
                ) |> ignore

                let! msg = mbox.Receive()
                match msg with
                | UpdateState ->
                    printfn "U processing"
                    // something very time consuming here...
                    async { do! Async.Sleep(1000) } |> Async.RunSynchronously
                    return! loop (state+1)
                | GetState(chnl) ->
                    printfn "G processing"
                    chnl.Reply(state)
                    return! loop state
             }
             loop 0
)

[async { for i in 1..10 do 
          printfn " U"
          mbox.Post(UpdateState)
          async { do! Async.Sleep(200) } |> Async.RunSynchronously
};
async { // wait some time so that several `UpdateState` messages are fired
        async { do! Async.Sleep(500) } |> Async.RunSynchronously
        for i in 1..20 do 
          printfn "G"
          printfn "%d" (mbox.PostAndReply(GetState))
}] |> Async.Parallel |> Async.RunSynchronously

コードを実行しようとするとGetState、結果を待っているため、メッセージがほとんど処理されていないことがわかります。一方、UpdateState発火して忘れるだけなので、状態の取得を効果的にブロックします。

編集

私のために働く現在の解決策はこれです:

type Msg = GetState  of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
             let rec loop state = async {
                // this TryScan doesn't work as expected
                // it should process GetState messages and then continue
                let! res = mbox.TryScan((function
                    | GetState(chnl) -> Some(async {
                            chnl.Reply(state)
                            return state
                        })
                    | _ -> None
                ), 5)

                match res with
                | None ->
                    let! msg = mbox.Receive()
                    match msg with
                        | UpdateState ->
                            async { do! Async.Sleep(1000) } |> Async.RunSynchronously
                            return! loop (state+1)
                        | _ -> return! loop state
                | Some n -> return! loop n
             }
             loop 0
)

コメントへの反応: otherMailboxProcessorまたはThreadPoolを並列に実行するアイデアUpdateStateは素晴らしいですが、現在は必要ありません。私がやりたかったのは、すべてのGetStateメッセージを処理し、その後で他のメッセージを処理することだけです。処理中UpdateStateにエージェントがブロックされてもかまいません。

出力の問題が何であったかを示します。

// GetState messages are delayed 500 ms - see do! Async.Sleep(500)
// each UpdateState is sent after 200ms
// each GetState is sent immediatelly! (not real example, but illustrates the problem)
 U            200ms   <-- issue UpdateState
U processing          <-- process UpdateState, it takes 1sec, so other 
 U            200ms       5 requests are sent; sent means, that it is
 U            200ms       fire-and-forget message - it doesn't wait for any result
                          and therefore it can send every 200ms one UpdateState message
G                     <-- first GetState sent, but waiting for reply - so all 
                          previous UpdateState messages have to be processed! = 3 seconds
                          and AFTER all the UpdateState messages are processed, result
                          is returned and new GetState can be sent. 
 U            200ms
 U            200ms       because each UpdateState takes 1 second
 U            200ms
U processing
 U
 U
 U
 U
U processing
G processing          <-- now first GetState is processed! so late? uh..
U processing          <-- takes 1sec
3
G
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
G processing          <-- after MANY seconds, second GetState is processed!
10
G
G processing
// from this line, only GetState are issued and processed, because 
// there is no UpdateState message in the queue, neither it is sent
4

3 に答える 3

4

TryScanこのシナリオでは、この方法が役立つとは思いません。メッセージの待機中に使用するタイムアウトを指定できます。メッセージが受信されると、メッセージの処理が開始されます (タイムアウトは無視されます)。

たとえば、特定のメッセージを待ちたいが、(待機中に) 毎秒別のチェックを実行する場合は、次のように記述できます。

let loop () = async {
  let! res = mbox.TryScan(function
    | ImportantMessage -> Some(async { 
          // process message 
          return 0
        })
    | _ -> None)
  match res with
  | None -> 
       // perform some check & continue waiting
       return! loop ()
  | Some n -> 
       // ImportantMessage was received and processed 
}

メッセージの処理中にメールボックス プロセッサがブロックされないようにするにはどうすればよいUpdateStateですか? メールボックス プロセッサは (論理的に) シングルスレッドです。おそらくメッセージの処理をキャンセルしたくないUpdateStateので、バックグラウンドで処理を開始し、処理が完了するまで待つのが最善の方法です。処理するコードはUpdateState、何らかのメッセージをメールボックスに送り返すことができます (例: UpdateStateCompleted)。

これがどのように見えるかのスケッチを次に示します。

let rec loop (state) = async {
  let! msg = mbox.Receive()
  match msg with
  | GetState(repl) -> 
      repl.Reply(state)
      return! scanning state
  | UpdateState -> 
      async { 
        // complex calculation (runs in parallel)
        mbox.Post(UpdateStateCompleted newState) }
      |> Async.Start
  | UpdateStateCompleted newState ->
      // Received new state from background workflow
      return! loop newState }

バックグラウンド タスクが並行して実行されるようになったので、変更可能な状態に注意する必要があります。また、UpdateStateメッセージを処理するよりも速く送信すると、問題が発生します。これは、たとえば、前のリクエストをすでに処理しているときにリクエストを無視またはキューに入れることで修正できます。

于 2011-02-02T22:27:50.363 に答える
3

TRYSCAN を使用しないでください!!!

残念ながら、TryScan現在のバージョンの F# の関数は 2 つの点で壊れています。まず、全体のポイントはタイムアウトを指定することですが、実装は実際にはそれを尊重しません。具体的には、無関係なメッセージがタイマーをリセットします。第 2 に、他のScan関数と同様に、メッセージ キューはロックされた状態で検査されます。これにより、任意の長い時間になる可能性があるスキャンの期間中、他のスレッドが投稿するのを防ぐことができます。その結果、TryScan関数自体が並行システムをロックアップする傾向があり、呼び出し元のコードがロック内で評価されるため、デッドロックが発生する可能性さえあります (たとえば、関数引数からエージェントへのポストScanまたはTryScan、ロック下のコードが取得を待機しているブロックのときにエージェントをデッドロックする可能性があります)。ロックはすでに下にあります)。

TryScan私は製品コードの初期のプロトタイプで使用しましたが、それは問題の終わりを引き起こしませんでした. しかし、私はそれを中心に設計することができ、結果として得られたアーキテクチャは実際にはより優れていました. 本質的に、私は熱心にReceiveすべてのメッセージを処理し、独自のローカル キューを使用してフィルタリングします。

于 2011-02-03T21:26:10.357 に答える
2

Tomasが述べたように、MailboxProcessorはシングルスレッドです。状態ゲッターとは別のスレッドで更新を実行するには、別のMailboxProcessorが必要になります。

#nowarn "40"

type Msg = 
    | GetState of AsyncReplyChannel<int> 
    | UpdateState

let runner_UpdateState = MailboxProcessor.Start(fun mbox ->
    let rec loop = async {
        let! state = mbox.Receive()
        printfn "U start processing %d" !state
        // something very time consuming here...
        do! Async.Sleep 100
        printfn "U done processing %d" !state
        state := !state + 1
        do! loop
    }
    loop
)

let mbox = MailboxProcessor.Start(fun mbox ->
    // we need a mutiple state if another thread can change it at any time
    let state = ref 0

    let rec loop = async {
        let! msg = mbox.Receive()

        match msg with
        | UpdateState -> runner_UpdateState.Post state
        | GetState chnl -> chnl.Reply !state

        return! loop 
    }
    loop)

[
    async { 
        for i in 1..10 do 
            mbox.Post UpdateState
            do! Async.Sleep 200
    };
    async { 
        // wait some time so that several `UpdateState` messages are fired
        do! Async.Sleep 1000

        for i in 1..20 do 
            printfn "G %d" (mbox.PostAndReply GetState)
            do! Async.Sleep 50
    }
] 
|> Async.Parallel 
|> Async.RunSynchronously
|> ignore

System.Console.ReadLine() |> ignore

出力:

U start processing 0
U done processing 0
U start processing 1
U done processing 1
U start processing 2
U done processing 2
U start processing 3
U done processing 3
U start processing 4
U done processing 4
G 5
U start processing 5
G 5
U done processing 5
G 5
G 6
U start processing 6
G 6
G 6
U done processing 6
G 7
U start processing 7
G 7
G 7
U done processing 7
G 8
G U start processing 8
8
G 8
U done processing 8
G 9
G 9
U start processing 9
G 9
U done processing 9
G 9
G 10
G 10
G 10
G 10

ThreadPoolを使用することもできます。

open System.Threading

type Msg = 
    | GetState of AsyncReplyChannel<int> 
    | SetState of int
    | UpdateState

let mbox = MailboxProcessor.Start(fun mbox ->
    let rec loop state = async {
        let! msg = mbox.Receive()

        match msg with
        | UpdateState -> 
            ThreadPool.QueueUserWorkItem((fun obj -> 
                let state = obj :?> int

                printfn "U start processing %d" state
                Async.Sleep 100 |> Async.RunSynchronously
                printfn "U done processing %d" state
                mbox.Post(SetState(state + 1))

                ), state)
            |> ignore
        | GetState chnl -> 
            chnl.Reply state
        | SetState newState ->
            return! loop newState
        return! loop state
    }
    loop 0)

[
    async { 
        for i in 1..10 do 
            mbox.Post UpdateState
            do! Async.Sleep 200
    };
    async { 
        // wait some time so that several `UpdateState` messages are fired
        do! Async.Sleep 1000

        for i in 1..20 do 
            printfn "G %d" (mbox.PostAndReply GetState)
            do! Async.Sleep 50
    }
] 
|> Async.Parallel 
|> Async.RunSynchronously
|> ignore

System.Console.ReadLine()|>無視

于 2011-02-03T00:59:13.837 に答える