問題タブ [tpl-dataflow]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
c# - TPL データフローを使用して、事前定義されたブロックの上に再利用可能な処理ロジックを作成しますか?
TPL データフローが大好きです。
興味深い設計上の選択は、ほとんどの定義済みブロックが Delegates を使用して、処理ロジックを実装できるようにすることです。これは、単純なシナリオでは良さそうです。しかし、モジュール性とカプセル化を必要とする現実世界の大きなアプリケーションについて考えてみましょう。DELEGATE アプローチを使用して適切に構造化されたアプリを作成するのは難しく、不自然であることがわかりました。
たとえば、再利用可能なクラス TYPE (インスタンスではない) として aMultiplyIntByTwoTransformBlock
と aだけが必要な場合。NoOpActionBlock
どうすれば達成できますか?TransformBlock
/から継承して、これを実現するためにいくつかのメソッドをActionBlock
オーバーライドできるといいのですが。Process()
ただし、事前定義されたブロックは封印されています。代理人のみを受け入れます。
カスタム ブロックを最初から作成できることはわかっていますが、必要なのは事前定義されたブロックに加えて少しカスタマイズするだけなので、明らかに複雑すぎます。
では、どうすれば目標を達成できるでしょうか。
更新: デリゲートにできないことがあると言っているわけではありません。多くのシナリオでは、テンプレート メソッド パターンで抽象ブロックを公開する方が優れていると言っています。たとえば、ポリモーフィズムを利用して、AbstractMultiplyBlock と MultiplyByTwoBlock と MultiplyByThreeBlock を記述できればいいのにと思います。残念ながら、デリゲートはそのような種類のデータとロジックの再利用性を提供しません。
mono - mono の TPL.Dataflow はどこで入手できますか?
Macbook で mono-3.0.10 を使用しています。mono 3.0 のリリース ノートでは、Tpl.Dataflow について言及されており、Tpl.Dataflow のソース コードが mono の github repo master ブランチにあることにも気付きました。
しかし、ローカルのモノインストールでは見つかりませんでした。
インストーラーからモノを再インストールしようとしましたが、うまくいきません。
次に、nuget からダウンロードした dll を試してみましたが、次のエラーが発生します。
nuget、net45、netcore45、portable-net45 の 3 つのエディションがあります。私はそれらをすべて試しましたが、それでもうまくいきません。
では、Tpl.Dataflow が mono で動作する最新の適切な更新はどこで見つけることができますか?
.net - タスクベースの非同期パターン (TAP) を使用した効率的なソケット I/O
私は、中央サーバーへの通信に raw TCP ソケットを使用するクライアント アプリケーションに取り組んでいます。アプリケーション メッセージはシリアル化され、TCP ストリームに渡されるフレームを作成するために長さがプレフィックスされます。
これを処理するための古典的な方法の 1 つは、ソケット クラスで Receive または BeginReceive を直接呼び出し、コールバックでメッセージを逆シリアル化し、メッセージを別のスレッドで処理するために別のキューに渡し、コールバックにソケットで別の受信を再度開始させることです。
このアプローチの素朴な実装は私にとって理想的ではありません.メッセージのシリアライゼーションとデシリアライゼーションをソケットに密接に結合し、キューが異なるスレッド/コールバックでうまく機能するようにするにはかなりの「配管」が必要です. また、これはやや漏れやすい抽象化でもあります。呼び出し元のコードには、入力メッセージと出力メッセージの「データ フロー」ではなく、基礎となるソケットの知識が必要です。
完全に .NET 4.5 内で作業していることを考えると、TPL (TaskFactory.FromAsync) を使用してソケットの Begin および End 非同期メソッドをラップすることは当然の選択です。ただし、いくつかの理由により、この時点からどのように進めればよいかわかりません。
- データを受信するために完了しない非同期の「タスク」が必要です。ソケットが接続されている限り、メッセージのストリームを処理したいと思います。中断 (切断、ソケット エラー、またはキャンセル要求) は、従来のタスクの完了ではなく、例外になります。Stephen Toub ( http://blogs.msdn.com/b/pfxteam/archive/2011/10/02/10218999.aspx ) によると、私は常にタスクを完了する必要があります。これにより、少し問題が生じます。従来の意味では、ソケットの受信は決して完了しません。Stephen は、「Awaiting Socket Operations」の投稿で少し矛盾しているように見えます。そこでは、ソケット エラーなしでは決して完了しないソケット読み取りを示しています ( http://blogs.msdn.com/b/pfxteam/archive/2011/12/15 /10248293.aspx )。
- 送信するデータを同期的に「キューに入れる」方法が必要です。発信者は、送信されるメッセージをブロックせずに送信できる必要があり、メッセージはソケットを介して順次送信される必要があります。つまり、メッセージのフレーミングにより、ソケット自体で一度に 1 つの送信のみが行われます。TPL データフローは適していますか、それとも別のキューイング パターンを使用する必要がありますか?
- メッセージのシリアル化とメッセージの送信の間の懸念を明確に分離したいと思います。
このタイプの戦略の例はあまり見たことがなく、「直接的な」ソケット I/O または単純な実装のみです。私の直感では、シリアライゼーションとデシリアライゼーションをパイプライン処理できることを考えると、TPL Dataflow が適していることがわかります。
Receive タスクの効果的に無限のチェーンを TPL Dataflow などに橋渡しする方法がわかりません。
何か案は?
c# - ITargetBlock 内でポリシーを再試行する
ワークフローに再試行ポリシーを導入する必要があります。このように接続された 3 つのブロックがあるとします。
そのため、データを蓄積するバッファがあり、一度に 3 つ以下のアイテムを処理する変換ブロックに送信し、結果をアクション ブロックに送信します。
潜在的に変換ブロックの処理中に一時的なエラーが発生する可能性があります。エラーが一時的なものである場合は、ブロックを数回再試行します。
ブロックは一般的に再試行できないことを私は知っています (ブロックに渡されたデリゲートは再試行可能にすることができます)。オプションの 1 つは、渡されたデリゲートをラップして再試行をサポートすることです。
TransientFaultHandling.Core
また、一時的な障害に対する再試行メカニズムを提供する非常に優れたライブラリがあることも知っています。これは優れたライブラリですが、私の場合はそうではありません。変換ブロックに渡されたデリゲートをRetryPolicy.ExecuteAsync
メソッドにラップすると、変換ブロック内のメッセージがロックされ、再試行が完了するか失敗するまで、変換ブロックは新しいメッセージを受信できなくなります。3 つのメッセージすべてが再試行に入力され (たとえば、次の再試行は 2 分後に行われるとします)、失敗した場合、少なくとも 1 つのメッセージが変換ブロックを離れるまで、変換ブロックは停止します。
私が見る唯一の解決策は、を拡張しTranformBlock
(実際にITargetBlock
は十分です)、手動で再試行することです(ここからのように):
ig を使用して、メッセージを変換ブロック内に遅延して再度配置しますが、この場合、再試行コンテキスト (残りの再試行回数など) もこのブロックに渡す必要があります。あまりにも複雑に聞こえます...
ワークフロー ブロックの再試行ポリシーを実装するためのより簡単なアプローチを見た人はいますか?
c# - 再試行可能なブロックの正しい完了を実装する
Teaser : 皆さん、この質問は再試行ポリシーを実装する方法に関するものではありません。これは、TPL Dataflow ブロックの正しい完了に関するものです。
この質問は、以前の質問ITargetBlock 内の再試行ポリシーの続きです。TransformBlock
この質問に対する答えは、 (ソース) とTransformManyBlock
(ターゲット)を利用する @svick のスマートなソリューションでした。残っている唯一の問題は、このブロックを正しい方法で完了することです。最初にすべての再試行が完了するのを待ってから、ターゲット ブロックを完了します。これが私が最終的に得たものです(これは単なるスニペットです。スレッドセーフではないretries
セットにはあまり注意を払わないでください):
アイデアは、何らかのポーリングを実行し、処理を待機しているメッセージがまだあるかどうか、および再試行が必要なメッセージがないかどうかを確認することです。しかし、このソリューションでは、ポーリングのアイデアは好きではありません。
はい、再試行の追加/削除のロジックを別のクラスにカプセル化できます。たとえば、再試行のセットが空になったときに何らかのアクションを実行することもできますが、target.InputCount > 0
条件を処理するにはどうすればよいですか? ブロックに保留中のメッセージがない場合に呼び出されるようなコールバックは存在しないため、target.ItemCount
遅延の少ないループで検証するしかないようです。
これを達成するためのよりスマートな方法を知っている人はいますか?
c# - ActionBlock は時々 MaxDegreeOfParallelism を無視するようです
MaxDegreeOfParallelism が 1 であるにもかかわらず、並列処理が発生しているように見える ActionBlock への投稿時に、予期しない動作が見られます。これがシナリオです。
ActionBlock に投稿するクラスは次のようになります。
下流では、バイトをオブジェクトにデシリアライズし、それらのオブジェクト (通知と呼びましょう) をそのタイプに応じてハンドラーに渡します。
このハンドラーには致命的なバグがあります: InitialNotifications は対応する SecondNotifications の前に到着しますが、HandleInitialNotification は終了する前に HandleSecondNotification を待機するため、スレッドは HandleSecondNotification に到達しません。
通常、HandleInitialNotification は、HandleSecondNotification の待機がタイムアウトになるまでブロックされ、保留中の SecondNotification が同じスレッドで処理されて実行が続行されます。これは通常、ログに表示されるものです。
これは、コードが意図された動作方法ではありませんが、記述された方法を考えると、SecondNotification を待って常にタイムアウトする必要があります。ただし、時折、HandleInitialNotification がタイムアウトする前に完了し、HandleSecondNotification が別のスレッドでタイムリーに処理されることもあります。
デフォルトの ActionBlock を使用しているため、MaxDegreeOfParallelism は 1 である必要があります。では、ActionBlock に投稿された元のスレッドがブロックしている間に、2 番目のスレッド (ActionBlock で発生) が SecondNotification を取得できるのはなぜでしょうか?