現在、RX フレームワークを使用して、ワークフローのようなメッセージ処理パイプラインを実装しています。基本的に、メッセージ プロデューサー (ネットワーク メッセージを逆シリアル化し、サブジェクトで OnNext() を呼び出す) があり、いくつかのコンシューマーがあります。
注: If と transform は、単に IObservable を返すようにコード化した拡張メソッドです。
コンシューマーは次のようなことを行います。
var commerceRequest = messages.Transform(x => GetSomethingFromDatabase(x)
.Where(y => y.Value > 5)
.Select(y => y.ComplexObject)
.If(z => z.IsPaid, respond(z))
.Do(z => SendError(z));
commerceRequest
その後、別の同様のパイプラインによって消費され、これが最上部まで続きSubscribe()
、最後のパイプラインを誰かが呼び出して終了します。私が抱えている問題は、サブスクライブがメッセージに対して直接どこかで呼び出されない限り、ベースからのメッセージが伝播しないことです。
メッセージをスタックの一番上にプッシュするにはどうすればよいですか? これが非正統的なアプローチであることは承知していますが、メッセージに何が起こっているのかをコードが非常に簡単に理解できるようになると思います。これがまったくひどい考えだと思うなら、誰かが同じことをする別の方法を提案できますか?