0

.net RXを使用してデータプロセッサのシーケンスを整理するための最良の方法は何ですか?
-a。--bのようにobservableでメソッドを呼び出します observable.Do(log).Select(transformation).Do(work).Aggregate(someState)...
。カスタムオブザーバーを実装する場合は、それらをチェーンする方法
-c。その他のオプション..また、observable自体で発生する可能性のある例外を処理し(上記の私の懸念を参照)、Do、Selectなどの内部で例外を処理するための最良のオプションは何ですか(ベストプラクティスはサブスクライバーがスローしないことです)。

また、シーケンスを停止せずに監視可能なシーケンスの一部の要素として例外が返されることを許可する必要がある場合もあります(この質問「シーケンスを停止せずにリアクティブ拡張で例外を処理する」を参照してください)

4

1 に答える 1

2

これは、Rxの代わりにワークフローが必要なようです。他の質問*に基づいているようですが、ProducerConsumerのキューイングワークフローシナリオに非常に適していると思われるものを取得し、それをRxに強制しようとしています。

あなたのように見えます

  • キューから読み取り、値が受信されるまでブロックしてから、再サブスクライブします。BlockingCollectionのキューイング機能を使用するだけです。値が到着すると、任意のスレッドからコレクションにプッシュできます。
  • 実際に一連の値を設定する必要はありませんが、特定の値で失敗する可能性のあるワークフローは、それを迂回させてから次の値を処理します。キューを使用するだけです。各値を処理し、結果を次の適切なキューに入れます(失敗/成功)->エンタープライズ統合パターン、特に無効なメッセージチャネルとデッドレターチャネルを参照してください。
  • これらの値を並行して処理する可能性があります。高性能の実装については、BlockingCollection.GetConsumingEnumerableまたはDisruptorを参照してください。これらのツールを使用すると、多くの生産物と消費者を持つことができます。確かにこれはRxで実行できますが、それはスレッドをポーリングして拘束するだけです。このようなことを明確にしたほうがいいと思います

Rxの使用はそれ自体で報われるべきであり、(他のテクノロジーやフレームワークのように)それと戦うべきではないと思います。

*シーケンスを 停止せずにReactiveExtensionsで例外を処理する方法や、Observablesをクラウドにシリアル化して戻す方法などの他の質問

于 2012-11-14T18:15:21.867 に答える