0

私はいくつかの MailboxProcessor テストを実行しようとしていますが、メールボックス Scan() が「System.Exception: 複数のメールボックスのリーダーの継続を待っています」で失敗するようです。これは、Async.Start や Async.StartImmediate などで発生します (Async.RunSynchronously も機能しません。最初の顧客の後にプロセッサが 1 つしかなく、顧客がいないためです)。

ここにデモコードがあります。これはインタラクティブに動作します:

#if INTERACTIVE
#r "../packages/FSharp.Data.2.0.4/lib/net40/FSharp.Data.dll"
#endif
open System
open FSharp.Data
let random = new Random()
let data = FreebaseData.GetDataContext()
let customerNames = data.Commons.Computers.``Computer Scientists``
let nameAmount = customerNames |> Seq.length
// ----

type Customer() =
    let person = customerNames |> Seq.nth (random.Next nameAmount)
    member x.Id = Guid.NewGuid()
    member x.Name = person.Name
    member x.RequiredTime = random.Next(10000)

type Barber(name) =
    member x.Name = name

type ``Possible actions notified to barber`` = 
| CustomerWalksIn of Customer

let availableCustomers = new MailboxProcessor<``Possible actions notified to barber``>(fun c -> async { () })

let createBarber name = 
    Console.WriteLine("Barber " + name + " takes inital nap...")
    let rec cutSomeHairs () = 
        async{
            do! availableCustomers.Scan(function 
                | CustomerWalksIn customer ->
                    async {
                        Console.WriteLine("Barber " + name + " is awake and started cutting " + customer.Name + "'s hair.")
                        // exception also happen with Threading.Thread.Sleep()
                        do! Async.Sleep customer.RequiredTime
                        Console.WriteLine("Barber " + name + " finnished cutting " + customer.Name + "'s hair. Going to sleep now...")
                    } |> Some)
            do! cutSomeHairs ()
            }
    cutSomeHairs() |> Async.StartImmediate

availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)

createBarber "Tuomas";
createBarber "Seppo";

availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)

...そして、しばらく実行した後に取得するスタックトレースは次のとおりです。

System.Exception: multiple waiting reader continuations for mailbox
   at <StartupCode$FSharp-Core>.$Control.-ctor@2136-3.Invoke(AsyncParams`1 _arg1)
   at <StartupCode$FSharp-Core>.$Control.loop@435-40(Trampoline this, FSharpFunc`2 action)
   at Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
   at Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
   at <StartupCode$FSharp-Core>.$Control.Sleep@1508-1.Invoke(Object state)
   at System.Threading.TimerQueueTimer.CallCallbackInContext(Object state)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.TimerQueueTimer.CallCallback()
   at System.Threading.TimerQueueTimer.Fire()
   at System.Threading.TimerQueue.FireNextTimers()
   at System.Threading.TimerQueue.AppDomainTimerCallback()
Stopped due to error

またはスレッドなしで同じ:

System.Exception: multiple waiting reader continuations for mailbox
   at <StartupCode$FSharp-Core>.$Control.-ctor@2136-3.Invoke(AsyncParams`1 _arg1)
   at <StartupCode$FSharp-Core>.$Control.loop@435-40(Trampoline this, FSharpFunc`2 action)
   at Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
   at Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
   at <StartupCode$FSharp-Core>.$Control.-ctor@511.Invoke(Object state)
Stopped due to error
4

2 に答える 2

1

ReceiveおよびScanメソッドはMailboxProcessor、エージェントの本体からのみ呼び出す必要があります。MSDN ドキュメントを引用するには:

このメソッドは、エージェントの本体内で使用するためのものです。エージェントごとに、最大で 1 つの同時リーダーをアクティブにできるため、Receive、TryReceive、Scan、または TryScan への複数の同時呼び出しをアクティブにすることはできません。スキャナー関数の本体は実行中にロックされますが、ロックは非同期ワークフローの実行前に解放されます。

したがって、コードを別の方法で構造化する必要があります。詳細な回答はありませんが、エージェントを使用したブロッキング キューの実装に関する私の記事が役立つようです。

于 2014-03-31T15:22:31.120 に答える
1

Tomas が既に指摘したように、MailboxProcessor直接は 1つのリーダーのみを許可し、非同期システム内でこの問題を解決する 1 つの方法は、独自のキューまたはメールボックス タイプを作成することです。ただし、Tomas が指摘した記事で触れられていないことの 1 つは、新しい通信プリミティブを実装する別の方法として、 andAsync.FromContinuationsではなくを使用することです。MailboxProcessorAsyncReplyChannel

を使用する主な利点は、非同期メカニズムに直接アクセスできることと、およびAsync.FromContinuationsによる制限内で作業する必要がないことです。主な欠点は、自分でキューまたはメールボックスをスレッド セーフにする必要があることです。MailboxProcessorAsyncReplyChannel

具体的な例として、Anton Tayanovskyy のブログ記事Making Async 5x Fasterには、 を使用して実装された複数のリーダー/ライター同期チャネルの実装が含まれていAsync.FromContinuationsます。

于 2014-07-19T10:23:20.517 に答える