5

x 時間滞在する条件状態をサブスクライブする方法を知っていますか?

たとえばBehaviorSubject<int>、0 から 100 までの int 値を表し、この値が時間の経過とともに変化している場合、この値が 50 未満で 10 秒間連続したときにサブスクライブしたいと考えています。

値が一瞬 50 を超えてから再び 50 を下回った場合、もう一度 10 秒間カウントしたいと思います。これどうやってするの?

どうもありがとう!

4

3 に答える 3

7

これは、私にとって適切に機能するように見える、かなりきちんとした観察可能なクエリです。

var below50longerthan10seconds =
    subject
        .Select(x => x < 50)
        .DistinctUntilChanged()
        .Select(x =>
            Observable.Delay(
                Observable.Return(x),
                TimeSpan.FromSeconds(10.0)))
        .Switch()
        .Where(x => x)
        .Select(x => Unit.Default);

これが内訳です。

値を 0 ~ 100 からtrue50 未満の場合に変更し、それ以外のfalse場合:

        .Select(x => x < 50)

true&の間の実際の変更のみを保持しますfalse

        .DistinctUntilChanged()

10 秒間遅延する新しいオブザーバブルに値を射影します。

        .Select(x =>
            Observable.Delay(
                Observable.Return(x),
                TimeSpan.FromSeconds(10.0)))

ここで、次のことを実行します.Switch()-x遅延オブザーバブルの前に新しいものが来る場合、新しい遅延オブザーバブルが来るので、それは無視されます:

        .Switch()

true元のストリームが 50 未満だったときの値のみを選択します。

        .Where(x => x)

値のない値Unit.Defaultのストリームを持つのは奇妙であるという理由だけで選択します。truefalse

        .Select(x => Unit.Default);

これIObservable<Unit>で、元のストリームが 50 未満の値を生成し、10 秒以内に 50 以上の値を生成しない場合に新しい値を解放する ができました。

それはあなたが望んでいたものですか?

于 2012-08-01T02:07:05.073 に答える
4

何かを見逃していないことを願っていますが、最も簡単な方法は次のとおりです。

BehaviorSubject<int> value = new BehaviorSubject<int>(0);

value.Select(v => v < 50).DistinctUntilChanged().Throttle(TimeSpan.FromSeconds(10))
.Where(x => x).Subscribe(b => DoSomething());

エニグマティビティの答えは、これを処理する方法の始まりを与えてくれました。ありがとう!

于 2012-08-01T09:11:13.293 に答える
1

@GideonEngelberth は、私の元のソリューション (以下) が機能しないことを強調しています。@Enigmativity からいくつかのアイデアを盗みましたが、ソースから値を取得するために「スイッチング ストリーム」を実際に使用するまでには至らなかったため、追加の手順を実行しました。

結果は次の拡張関数です。

public static IObservable<T> WhereTimed<T>(this IObservable<T> source, Func<T,bool> pred, TimeSpan minTime)
{
    var published = source.Publish().RefCount(); // we make multiple subscriptions, let's share them

    var switches = published.Select(pred).DistinctUntilChanged();

    return published.Window(switches.Where(x => x), _ => switches.Where(x => !x))
                    .SelectMany(xs => xs.SkipUntil(Observable.Timer(minTime)).TakeWhile(pred));
}

これは、 を使用して、しきい値を下回るたびに 1 回 ( )、しきい値を超えるたびに 1 回 ( )Select(pred).DistinctUntilChanged()起動するストリームを提供します。truefalse

次に、このストリームを利用して、trueとの間のオブザーバブルにウィンドウを作成しますfalse。その新しいストリームの最初の N 秒をスキップし、まだしきい値を下回っている間に取ります。


オリジナル:(動作していません)

この問題は次のように分解されます。

ストリームが 50 未満の場合、最初の 10 秒間の値をスキップしてからストリームを返します。ストリームが 50 を超えるまでこれを行います。その後、最初からやり直します。

RX の優れた点は、これをそのまま関連する関数に変換できることです。これを正確に行う簡単な拡張メソッドを次に示します。

public static IObservable<T> WhereTimed<T>(this IObservable<T> source, Func<T,bool> pred, TimeSpan minTime)
{
    var published = source.Publish().RefCount(); // we make multiple subscriptions, let's share them

    var openers = published.Where(pred);            // start taking at this point
    var closers = published.Where(z => !pred(z));   // stop taking at this point

    return openers.SkipUntil(Observable.Timer(minTime))
                  .TakeUntil(closers)
                  .Repeat();
}

これが機能することを示すテストは次のとおりです。

var ws = Observable.Repeat(1,10);
var xs = Observable.Repeat(2,10);
var ys = Observable.Repeat(100,10);
var zs = ws.Concat(xs.Delay(TimeSpan.FromSeconds(2)).Concat(ys)).Repeat(5);

zs.WhereTimed(z => z <= 50, TimeSpan.FromSeconds(1)).Subscribe(Console.WriteLine);

zsストリームは次のとおりです。

(1 x 10) <2 second pause> (2 x 10) (100 x 10) [repeat 5 times]

ルールに基づくと、最初の 2 秒間の一時停止でストリームがトリガーされ、2値が 10 回表示されます。ただし、100がヒットするとすぐにリセットされ、その後1が再生されると、表示するのに十分な一時停止がありません1。これが数回繰り返され、数字だけ2が表示されます。

于 2012-07-31T21:35:33.427 に答える