私は、IObservable<IObservable<T>>
各内部IObservable<T>
が値のストリームであり、その後に最終的なOnCompleted
イベントが続く場所を持っています。
IObservable<IEnumerable<T>>
これを、完了していない内部ストリームからの最新の値で構成されるストリームに変換したいと思います。IEnumerable<T>
内部ストリームの 1 つから新しい値が生成される (または内部ストリームが期限切れになる) たびに、新しい値を生成する必要があります。
マーブル ダイアグラムで最も簡単に示されます (十分に包括的であることを願っています)。
input ---.----.---.----------------
| | '-f-----g-|
| 'd------e---------|
'a--b----c-----|
result ---a--b-b--c-c-c-e-e-e---[]-
d d d e f g
f f
([]
は空で、OnCompletedIEnumerable<T>
を表します)-|
少し操作に似ていることがわかりますCombineLatest
。あれこれいじってみJoin
たGroupJoin
が無駄だったが、それはほぼ確実に正しい方向に向かっていると感じている.
この演算子でできるだけ少ない状態を使用したいと思います。
アップデート
この質問を更新して、単一値のシーケンスだけでなく、結果IObservable<IEnumerable<T>>
には各シーケンスの最新の値のみを含める必要があります。シーケンスが値を生成していない場合は、含める必要はありません。