7

私は、IObservable<IObservable<T>>各内部IObservable<T>が値のストリームであり、その後に最終的なOnCompletedイベントが続く場所を持っています。

IObservable<IEnumerable<T>>これを、完了していない内部ストリームからの最新の値で構成されるストリームに変換したいと思います。IEnumerable<T>内部ストリームの 1 つから新しい値が生成される (または内部ストリームが期限切れになる) たびに、新しい値を生成する必要があります。

マーブル ダイアグラムで最も簡単に示されます (十分に包括的であることを願っています)。

input ---.----.---.----------------
         |    |   '-f-----g-|      
         |    'd------e---------|
         'a--b----c-----|          

result ---a--b-b--c-c-c-e-e-e---[]-
               d  d d e f g        
                    f f            

([]は空で、OnCompletedIEnumerable<T>を表します)-|

少し操作に似ていることがわかりますCombineLatest。あれこれいじってみJoinGroupJoinが無駄だったが、それはほぼ確実に正しい方向に向かっていると感じている.

この演算子でできるだけ少ない状態を使用したいと思います。

アップデート

この質問を更新して、単一値のシーケンスだけでなく、結果IObservable<IEnumerable<T>>には各シーケンスの最新の値のみを含める必要があります。シーケンスが値を生成していない場合は、含める必要はありません。

4

3 に答える 3

3

これは、昨日のソリューションに基づいたバージョンで、新しい要件に合わせて調整されています。基本的な考え方は、腐りやすいコレクションに参照を入れて、内部シーケンスが新しい値を生成するときに参照の値を更新することです。

また、内部サブスクリプションを適切に追跡し、外部オブザーバブルがサブスクライブ解除された場合はサブスクライブ解除するように変更しました。

また、いずれかのストリームでエラーが発生した場合はすべて破棄するように変更されました。

最後に、Rx ガイドラインに違反する可能性のあるいくつかの競合状態を修正しました。内部のオブザーバブルが異なるスレッドから同時に発火している場合、同時に呼び出しが発生する可能性がありますがobs.OnNext、これは大きな問題です。そのため、同じロックを使用して内部の各オブザーバブルをゲートして、それを防ぎました (Synchronize呼び出しを参照)。このためPerishableCollection、コレクションを使用するすべてのコードがロック内にあるため、 のスレッド保証は必要ないため、 の代わりに通常の二重連結リストを使用することでおそらく回避できることに注意してくださいPerishableCollection

// Acts as a reference to the current value stored in the list
private class BoxedValue<T>
{
    public T Value;
    public BoxedValue(T initialValue) { Value = initialValue; }
}

public static IObservable<IEnumerable<T>> MergeLatest<T>(this IObservable<IObservable<T>> source)
{
    return Observable.Create<IEnumerable<T>>(obs =>
    {
        var collection = new PerishableCollection<BoxedValue<T>>();
        var outerSubscription = new SingleAssignmentDisposable();
        var subscriptions = new CompositeDisposable(outerSubscription);
        var innerLock = new object();

        outerSubscription.Disposable = source.Subscribe(duration =>
        {
            BoxedValue<T> value = null;
            var lifetime = new DisposableLifetime(); // essentially a CancellationToken
            var subscription = new SingleAssignmentDisposable();

            subscriptions.Add(subscription);
            subscription.Disposable = duration.Synchronize(innerLock)
                .Subscribe(
                    x =>
                    {
                        if (value == null)
                        {
                            value = new BoxedValue<T>(x);
                            collection.Add(value, lifetime.Lifetime);
                        }
                        else
                        {
                            value.Value = x;
                        }
                        obs.OnNext(collection.CurrentItems().Select(p => p.Value.Value));
                    },
                    obs.OnError, // handle an error in the stream.
                    () => // on complete
                    {
                        if (value != null)
                        {
                            lifetime.Dispose(); // removes the item
                            obs.OnNext(collection.CurrentItems().Select(p => p.Value.Value));
                            subscriptions.Remove(subscription); // remove this subscription
                        }
                    }
            );
        });

        return subscriptions;
    });
}
于 2013-07-29T22:27:00.720 に答える