2

制御オブザーバブルによって定義された時間にいくつかのオブザーバブルをサンプリングする必要がある状況があります。

すぐにマーブル ダイアグラムにジャンプします。

以下の co は制御オブザーバブル (Observe.Interval(TimeSpan.FromSeconds(1))) です。

o* はさまざまなタイプのオブザーバブルです

co tick  ---------x---------x---------x---------x---------x---------x-
o1 int   --2----1-------4--------3--------1-------2----3-------5------
o2 bool  -t-----------f---------t---------------------------f---------

result   ---------Z---------Z---------Z---------Z---------Z---------Z-

制御チャネルにティックがある場合にのみ、各 o-observable の最新の値をサンプリングする拡張メソッドを開発する必要があります。

Z は、o-observable からの最新のサンプリング値をセレクター関数に渡した結果になります。これは、CombineLatest に似ていますが、完全ではありません。

つまり、非常に単純化した例として、func が次のようになっているとしましょう。

 (i, b) => {if (b) return i; else return 0; }

この場合の結果を

 result   ---------1---------0---------3---------1---------3---------0-

最初の 1 は、o2 が最後に真であり、o1 が最後に 1 だったからです。2 番目の 0 は、o2 が最後に偽だったからです。

o2 は、すべてのサンプル間で常に 1 つの値を生成するとは限らないことに注意してください。最後の値を取得する必要があります。(.Sample() は動作しません。) 実際には、関連する関数と型はより複雑なので、上記の int 型と bool 型のために仮定をしないでください。

また、ティックごとに1回だけ実行するセレクター関数が必要です。

これは私の現在のソリューションですが、CombineLatest の後にサンプリングするため、上記の要件に準拠していません。

 var combined = interval.CombineSampled(
            Status,
            H1,
            H2
            M,
            T,
            HMode,
            TMode,
            (i, s, hr1, hr2, m, t, hMode, tMode) =>
            {
               ... (left out)

               return result;
            });

結合サンプル:

public static IObservable<TResult> CombineSampled<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(this IObservable<TSource1> controllingSource, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult> selector)
    {
        return controllingSource.Publish(s => s.CombineLatest(source2, source3, source4, source5, source6, source7, source8, selector).SampleEx(s));
    }


public static IObservable<T> SampleEx<T, S>( this IObservable<T> source, IObservable<S> samples )
    {
        // This is different from the Rx version in that source elements will be repeated, and that
        // we complete when either sequence ends. 
        return Observable.Create( ( IObserver<T> obs ) =>
            {
                object gate = new object();
                bool hasSource = false;
                var value = default(T);

                return new CompositeDisposable(
                    source.Synchronize( gate ).Subscribe( v => { value = v; hasSource = true; }, obs ),
                    samples.Synchronize( gate ).Subscribe( _ => { if ( hasSource ) obs.OnNext( value ); }, obs )
                );
            } );
    }
4

1 に答える 1