F# エージェントについて学んでいます ( MailboxProcessor
)。
私はかなり型破りな問題に取り組んでいます。
dataSource
ストリーミング データのソースである1 つのエージェント ( ) があります。データは一連のエージェント (dataProcessor
) によって処理される必要があります。dataProcessor
ある種の追跡装置と考えることができます。dataProcessor
が入力を処理できる速度よりも速くデータが流れ込む場合があります。- 多少の遅れがあってもOKです。ただし、エージェントがその作業を常に把握し、時代遅れの監視が重ならないようにする必要があります。
この問題に対処する方法を模索しています。
最初のアイデアは、スタック(LIFO)を に実装することdataSource
です。データを受信して処理できるようdataSource
になったときに、利用可能な最新の観測を送信します。このソリューションは機能する可能性がありますが、ブロックして再アクティブ化する必要があるdataProcessor
ため、複雑になる可能性があります。dataProcessor
とそのステータスを に通信するdataSource
ため、双方向通信の問題が発生します。この問題はblocking queue
、消費者と生産者の問題に要約される可能性がありますが、私にはわかりません..
2 番目のアイデアは dataProcessor
、メッセージの並べ替えを処理することです。このアーキテクチャでは、 は更新をのキューdataSource
に投稿するだけです。キューで利用可能な最新のデータをフェッチするために使用します。これが進むべき道かもしれません。ただし、現在の設計でメッセージのキューをクリアして、古い古いメッセージを削除できるかどうかはわかりません。さらに、ここでは、次のように書かれています。dataProcessor
dataProcessor
Scan
MailboxProcessor
残念ながら、現在のバージョンの F# の TryScan 関数は 2 つの点で壊れています。まず、要点はタイムアウトを指定することですが、実装は実際にはそれを尊重しません。具体的には、無関係なメッセージがタイマーをリセットします。第 2 に、他の Scan 関数と同様に、メッセージ キューはロックされた状態で検査されます。これにより、任意の長い時間になる可能性があるスキャンの間、他のスレッドがポストすることを防止できます。その結果、TryScan 関数自体が並行システムをロックアップする傾向があり、呼び出し元のコードがロック内で評価されるため、デッドロックが発生することさえあります (たとえば、関数の引数から Scan または TryScan へのポストは、ロックの下のコードが待機をブロックしているときにエージェントをデッドロックする可能性があります)。すでに下にあるロックを取得します)。
最新の観測値が跳ね返ることが問題になる場合があります。この投稿の著者である @Jon Harrop は、次のように提案しています。
私はそれを中心に設計することに成功し、結果として得られたアーキテクチャは実際にはより優れたものになりました. 本質的に、私は熱心に
Receive
すべてのメッセージを処理し、独自のローカル キューを使用してフィルタリングします。
このアイデアは確かに検討する価値がありますが、コードをいじり始める前に、ソリューションをどのように構築できるかについていくつかの意見を歓迎します。
ありがとうございました。