2

私がやりたいことの宣言は次のようになります。

// Checks input source for timeouts, based on the number of elements received 
// from clock since the last one received from source. 
// The two selectors are used to generate output elements.
public static IObservable<R> TimeoutDetector<T1,T2,R>(
        this IObservable<T1> source, 
        IObservable<T2> clock, 
        int countForTimeout,
        Func<R> timedOutSelector, 
        Func<T1, R> okSelector)

アスキーではマーブル ダイアグラムは難しいですが、次のようになります。

source --o---o--o-o----o-------------------o---
clock  ----x---x---x---x---x---x---x---x---x---
output --^---^--^-^----^-----------!-------^---

組み合わせて使用​​できる方法で既存のObservable関数を探してみましたが、ほとんどの組み合わせ関数は、「それぞれの 1 つ」( 、 )を受け取ることに依存しているか、「前の」値を1 つ「不足」している ( )、または必要なものから離れすぎている ( 、、、、) 。近いように見えますが、ソース スループットをクロック レートに制限したくありません。sourceclockAndZipCombineLatestAmbGroupJoinJoinMergeSelectManyTimeoutSample

だから今、私はここで大きなギャップを埋めようとして立ち往生しています:

return new AnonymousObservable<R>(observer =>
{
    //One observer, two observables??
});

申し訳ありませんが、「何を試しましたか」セクションはここでは少し不十分です: 私がそれについて考えてみたとしましょう! 完全な実装を求めているわけではありません。

  • 私が見逃したのに役立つ組み込み関数はありますか?
  • 2 つのオブザーバブルをサブスクライブするラムダベースのオブザーバーを構築するにはどうすればよいですか?
4

4 に答える 4

6

完全な実装を求めていないことは知っていますが、これが解決策だと思います。

public static IObservable<TR> TimeoutDetector<T1, T2, TR>(
    this IObservable<T1> source,
    IObservable<T2> clock,
    int countForTimeout,
    Func<TR> timedOutSelector,
    Func<T1, TR> okSelector)
{
    return source
        .Select(i => clock.Take(countForTimeout).LastAsync())
        .Switch().Select(_ => timedOutSelector())
        .Merge(source.Select(okSelector));
}

次のように動作します-出力は、タイムアウトイベントとマージされたokSelectorによって投影されたソースであることに気付きました。後は簡単なので、タイムアウト イベントの生成に集中します。

アイデアは、ソースが放出するたびにカウントダウンを作成し、クロックパルスごとにこのカウントダウンを減らすことです。ソースが発行する場合はカウントダウンを中止します。それ以外の場合は、カウントダウンが 0 に達したときに timedOut イベントを生成します。

それを分解する:

  1. 各ソース要素を、最初の要素を取るストリームにcountForTimeout射影します。countDown イベントごとにサブスクライブするため、クロックストリームは「ホット」なオブザーバブルでなければならないことに注意してください。クロック ストリームが熱くなるのはごく普通のことです。これがイベントを取得すると、タイムアウトになります。
  2. Switch最新のカウントダウン ストリーム以外はすべて破棄されます。
  3. SelecttimedOut イベントに投影するために使用します。
  4. ソースイベントをマージするだけです。

これは、私が使用した単体テストであり、マーブル ダイアグラムによく似ているように設計されています (コンパイルに必要なライブラリの nuget rx-testing & nunit):

    [Test]
    public void AKindOfTimeoutTest()
    {
        var scheduler = new TestScheduler();

        var clockStream = scheduler.CreateHotObservable(
            OnNext(100, Unit.Default),
            OnNext(200, Unit.Default),
            OnNext(300, Unit.Default),
            OnNext(400, Unit.Default),
            OnNext(500, Unit.Default),
            OnNext(600, Unit.Default),
            OnNext(750, Unit.Default), /* make clock funky! */
            OnNext(800, Unit.Default),
            OnNext(900, Unit.Default));


        var sourceStream = scheduler.CreateColdObservable(
            OnNext(50, 1),
            OnNext(150, 2),
            OnNext(250, 3),
            OnNext(275, 4),
            OnNext(400, 5),
            OnNext(900, 6));


        Func<int> timedOutSelector = () => 0;
        Func<int, int> okSelector = i => i;

        var results = scheduler.CreateObserver<int>();

        sourceStream.TimeoutDetector(clockStream, 3, timedOutSelector, okSelector)
                    .Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(50, 1),
            OnNext(150, 2),
            OnNext(250, 3),
            OnNext(275, 4),
            OnNext(400, 5),
            OnNext(750, 0),
            OnNext(900, 6));
    }
}

特定の質問に答えるには:

  • Q. 私が見逃していた、役立つ組み込み関数はありますか? A. おそらくスキャンが鍵です。
  • Q. 2 つのオブザーバブルにサブスクライブするラムダベースのオブザーバーを構築するにはどうすればよいですか? A. これが何を意味するのかよくわかりません... ストリームを結合する方法はたくさんありますが、そのほとんどについて言及されました。
于 2013-11-07T08:35:44.703 に答える
2

これが私がほのめかしたObservable.Createアプローチです(同じテストが機能します):

public static IObservable<TR> TimeoutDetector<T1, T2, TR>(
    this IObservable<T1> source,
    IObservable<T2> clock,
    int countForTimeout,
    Func<TR> timedOutSelector,
    Func<T1, TR> okSelector)
{
    return Observable.Create<TR>(observer =>
        {
            var counter = countForTimeout;

            var timeoutSub = clock.Subscribe(_ =>
                {
                    var count = Interlocked.Decrement(ref counter);
                    if (count == 0)
                    {
                        observer.OnNext(timedOutSelector());
                    }
                },
                observer.OnError,
                observer.OnCompleted);

            var sourceSub = source.Subscribe(
                i =>
                {
                    Interlocked.Exchange(ref counter, countForTimeout);
                    observer.OnNext(okSelector(i));
                },
                observer.OnError,
                observer.OnCompleted);

            return new CompositeDisposable(sourceSub, timeoutSub);
        });
}

Observable.Create は、正しい Rx 文法が使用されていることを確認するのに非常に役立ちます (つまり、ストリームは OnNext* (OnError | OnCompleted) を発行しますか?) - これは、OnError または OnCompleted を最大 1 回送信することについて多少緩和できることを意味します。

于 2013-11-07T09:56:06.520 に答える
0

私はこれを思いつきましたが、これはジェームズの答えよりもかなり劣っています。

public static IObservable<R> TimeoutDetector2<T1, T2, R>(
        this IObservable<T1> source, 
        IObservable<T2> clock, int maxDiff,
        Func<R> timedOutSelector, Func<T1, R> okSelector)
{
    return new AnonymousObservable<R>(observer =>
    {
        int counter = 0;
        object gate = new object();
        bool error = false;
        bool completed = false;
        bool timedOut = false;
        var sourceSubscription = source.Subscribe(
            x =>
            {
                lock(gate)
                {
                    if(!error && !completed) observer.OnNext(okSelector(x));
                    counter = 0;
                    timedOut = false;
                }
            },
            ex =>
            {
                lock(gate)
                {
                    error = true;
                    if(!completed) observer.OnError(ex);
                }
            },
            () =>
            {
                lock(gate)
                {
                    completed = true;
                    if(!error) observer.OnCompleted();
                }
            });
        var clockSubscription = clock.Subscribe(
            x =>
            {
                lock(gate)
                {
                    counter = counter + 1;
                    if(!error && !completed && counter > maxDiff && !timedOut)
                    {
                        timedOut = true;
                        observer.OnNext(timedOutSelector());
                    }
                }
            },
            ex =>
            {
                lock(gate)
                {
                    error = true;
                    if(!completed) observer.OnError(ex);
                }
            },
            () =>
            {
                lock(gate)
                {
                    completed = true;
                    if(!error) observer.OnCompleted();
                }
            });

        //need to return a subscription
        return new CompositeDisposable(sourceSubscription, clockSubscription);
    }).Publish().RefCount(); // prevent subscribers provoking more than one subscription to source and clock
}
于 2013-11-07T09:30:22.267 に答える