次のように公開された着信接続があります。
IObservable<double>
新しい接続が確立されるたびに、次の方法で通知します。
IObservable<IObservable<double>> m_IncomingConnections;
私の問題は、私が使用できるオペレーターを構築したいということです
IObservable<IList<double>> m_joined = m_IncomingConnections.JoinAll();
したがって、最初に 3 つの着信接続があり、m_joined オブザーバブルが 3 つの要素を持つリストをプッシュするとします。
|--a1----a2----------a3-------
|b1---b2----b3--b4---------b5-
|---c1----c2---c3--c4---------
結果:
>[a1,b1,c1]
>[a1,b2,c1]
>[a2,b2,c1]
>[a2,b2,c2]
>[a2,b3,c2]
アイデアはわかりましたが、新しい接続が入ってきた場合、新しい接続が新しい値をプッシュするとすぐに配列のサイズを 4 にしたいと思います。
これですべてが使用できるようになりました
m_joined.Select(vv => vv.Average());
新しい値がプッシュされるたびに、すべてのストリームの平均値をストリーミングします。
Rx にそのような拡張機能を実装することについて何か提案はありますか?
助けてくれてありがとう。