5

F# エージェントについて学んでいます ( MailboxProcessor)。

私はかなり型破りな問題に取り組んでいます。

  • dataSourceストリーミング データのソースである1 つのエージェント ( ) があります。データは一連のエージェント ( dataProcessor) によって処理される必要があります。dataProcessorある種の追跡装置と考えることができます。
  • dataProcessorが入力を処理できる速度よりも速くデータが流れ込む場合があります。
  • 多少の遅れがあってもOKです。ただし、エージェントがその作業を常に把握し、時代遅れの監視が重ならないようにする必要があります。

この問題に対処する方法を模索しています。

最初のアイデアは、スタック(LIFO)を に実装することdataSourceです。データを受信して​​処理できるようdataSourceになったときに、利用可能な最新の観測を送信します。このソリューションは機能する可能性がありますが、ブロックして再アクティブ化する必要があるdataProcessorため、複雑になる可能性があります。dataProcessorとそのステータスを に通信するdataSourceため、双方向通信の問題が発生します。この問題はblocking queue消費者と生産者の問題に要約される可能性がありますが、私にはわかりません..

2 番目のアイデアdataProcessor、メッセージの並べ替えを処理することです。このアーキテクチャでは、 は更新をのキューdataSourceに投稿するだけです。キューで利用可能な最新のデータをフェッチするために使用します。これが進むべき道かもしれません。ただし、現在の設計でメッセージのキューをクリアして、古い古いメッセージを削除できるかどうかはわかりません。さらに、ここでは、次のように書かれています。dataProcessordataProcessorScanMailboxProcessor

残念ながら、現在のバージョンの F# の TryScan 関数は 2 つの点で壊れています。まず、要点はタイムアウトを指定することですが、実装は実際にはそれを尊重しません。具体的には、無関係なメッセージがタイマーをリセットします。第 2 に、他の Scan 関数と同様に、メッセージ キューはロックされた状態で検査されます。これにより、任意の長い時間になる可能性があるスキャンの間、他のスレッドがポストすることを防止できます。その結果、TryScan 関数自体が並行システムをロックアップする傾向があり、呼び出し元のコードがロック内で評価されるため、デッドロックが発生することさえあります (たとえば、関数の引数から Scan または TryScan へのポストは、ロックの下のコードが待機をブロックしているときにエージェントをデッドロックする可能性があります)。すでに下にあるロックを取得します)。

最新の観測値が跳ね返ることが問題になる場合があります。この投稿の著者である @Jon Harrop は、次のように提案しています。

私はそれを中心に設計することに成功し、結果として得られたアーキテクチャは実際にはより優れたものになりました. 本質的に、私は熱心にReceiveすべてのメッセージを処理し、独自のローカル キューを使用してフィルタリングします。

このアイデアは確かに検討する価値がありますが、コードをいじり始める前に、ソリューションをどのように構築できるかについていくつかの意見を歓迎します。

ありがとうございました。

4

3 に答える 3

1

tl;dr私はこれを試してみます: FSharp.Actor または Zach Bray のブログ投稿からメールボックスの実装を取得し、ConcurrentQueue を ConcurrentStack に置き換え (さらにいくつかの制限された容量ロジックを追加)、この変更されたエージェントをディスパッチャとして使用して、dataSource から大量のメッセージを渡します。通常の MBP またはアクターとして実装された dataProcessors。

tl;dr2ワーカーが不足して遅いリソースであり、ワーカーの準備ができた時点で最新のメッセージを処理する必要がある場合、すべてはキューではなくスタックを持つエージェントに要約されます (いくつかの制限付き) capacity ロジック) とワーカーの BlockingQueue です。Dispatcher は、準備が整ったワーカーをデキューし、スタックからメッセージをポップして、このメッセージをワーカーに送信します。ジョブが完了した後、ワーカーは準備が整うとキューに入れます (例: 前let! msg = inbox.Receive())。ディスパッチャ コンシューマ スレッドはワーカーの準備が整うまでブロックしますが、プロデューサ スレッドはバインドされたスタックを更新したままにします。(バインドされたスタックは、ロック内の配列 + オフセット + サイズで実行できます。以下は複雑すぎます)

詳細

MailBoxProcessor は、コンシューマーを 1 つだけ持つように設計されています。これは、MBP のソース コードでもコメントされています(「DRAGONS」という単語を検索してください:))。

データを MBP に投稿すると、1 つのスレッドのみが内部キューまたはスタックから取得できます。特定のユースケースでは、 ConcurrentStackを直接使用するか、BlockingCollectionにラップすることをお勧めします。

  • 多くの同時消費者を許可します
  • 非常に高速でスレッドセーフです
  • BlockingCollectionBoundedCapacityコレクションのサイズを制限できるプロパティがあります。スローしAddますが、キャッチするか使用できますTryAdd。A がメイン スタックで B がスタンバイの場合、TryAddA に、Falseで B に、その 2 つをInterlocked.ExchangeAddと交換し、A で必要なメッセージを処理し、クリアして、新しいスタンバイを作成します。処理する場合は 3 つのスタックを使用します。 A は、B が再びいっぱいになるよりも長くなる可能性があります。このようにして、メッセージをブロックしたり失ったりすることはありませんが、不要なメッセージを破棄することは制御された方法です。

BlockingCollection には、BlockingCollection の配列で機能する AddToAny/TakeFromAny などのメソッドがあります。これは次のように役立ちます。

  • dataSource は、ConcurrentStack 実装 (BCCS) を使用して BlockingCollection にメッセージを生成します
  • 別のスレッドが BCCS からのメッセージを消費し、それらを処理中の BCCS の配列に送信します。データが多いとおっしゃいました。1 つのスレッドを犠牲にして、メッセージを無期限にブロックおよびディスパッチすることができます
  • 各処理エージェントは、独自の BCCS を持っているか、ディスパッチャがメッセージを投稿するエージェント/アクター/MBP として実装されています。あなたのケースでは、メッセージを 1 つの processorAgent にのみ送信する必要があるため、処理エージェントを循環バッファーに格納して、常に使用頻度の低いプロセッサーにメッセージをディスパッチすることができます。

このようなもの:

            (data stream produces 'T)
                |
            [dispatcher's BCSC]
                |
            (a dispatcher thread consumes 'T  and pushes to processors, manages capacity of BCCS and LRU queue)
                 |                               |
            [processor1's BCCS/Actor/MBP] ... [processorN's BCCS/Actor/MBP]
                 |                               |
               (process)                         (process)

ConcurrentStack の代わりに、ヒープ データ構造について読みたいと思うかもしれません。スタックに到着する順序ではなく、タイムスタンプなどのメッセージのプロパティによって最新のメッセージが必要な場合 (たとえば、転送と到着順序 <> 作成順序に遅延がある場合)、最新のメッセージを取得できます。ヒープを使用したメッセージ。

それでもエージェントのセマンティクス/API が必要な場合は、Dave のリンクに加えていくつかのソースを読んで、どうにかして複数の同時コンシューマーへの実装を採用できます。

  • 効率的なアクターの実装に関する Zach Bray による興味深い記事。// Might want to schedule this call on another thread.そこで、(コメントの下で)行execute trueを行などに置き換える必要があります。async { execute true } |> Async.Startそうしないと、生成スレッドがスレッドを消費するためです。単一の高速プロデューサーには適していません。ただし、上記のようなディスパッチャーの場合、これはまさに必要なものです。

  • FSharp.Actor (aka Fakka)開発ブランチと FSharp MPB ソース コード (上記の最初のリンク) は、実装の詳細に非常に役立ちます。FSharp.Actors ライブラリは数か月間凍結されていますが、dev ブランチでいくつかのアクティビティがあります。

  • この文脈では、Google グループでのFakka に関する議論を見逃してはなりません。

私はやや似たユース ケースを持っており、過去 2 日間、F# エージェント/アクターで見つけられるすべてを調査しました。この回答は、半分は執筆中に生まれたアイデアを試すための一種の TODO です。

于 2014-01-30T00:07:53.963 に答える