x 時間滞在する条件状態をサブスクライブする方法を知っていますか?
たとえばBehaviorSubject<int>
、0 から 100 までの int 値を表し、この値が時間の経過とともに変化している場合、この値が 50 未満で 10 秒間連続したときにサブスクライブしたいと考えています。
値が一瞬 50 を超えてから再び 50 を下回った場合、もう一度 10 秒間カウントしたいと思います。これどうやってするの?
どうもありがとう!
x 時間滞在する条件状態をサブスクライブする方法を知っていますか?
たとえばBehaviorSubject<int>
、0 から 100 までの int 値を表し、この値が時間の経過とともに変化している場合、この値が 50 未満で 10 秒間連続したときにサブスクライブしたいと考えています。
値が一瞬 50 を超えてから再び 50 を下回った場合、もう一度 10 秒間カウントしたいと思います。これどうやってするの?
どうもありがとう!
これは、私にとって適切に機能するように見える、かなりきちんとした観察可能なクエリです。
var below50longerthan10seconds =
subject
.Select(x => x < 50)
.DistinctUntilChanged()
.Select(x =>
Observable.Delay(
Observable.Return(x),
TimeSpan.FromSeconds(10.0)))
.Switch()
.Where(x => x)
.Select(x => Unit.Default);
これが内訳です。
値を 0 ~ 100 からtrue
50 未満の場合に変更し、それ以外のfalse
場合:
.Select(x => x < 50)
true
&の間の実際の変更のみを保持しますfalse
。
.DistinctUntilChanged()
10 秒間遅延する新しいオブザーバブルに値を射影します。
.Select(x =>
Observable.Delay(
Observable.Return(x),
TimeSpan.FromSeconds(10.0)))
ここで、次のことを実行します.Switch()
-x
遅延オブザーバブルの前に新しいものが来る場合、新しい遅延オブザーバブルが来るので、それは無視されます:
.Switch()
true
元のストリームが 50 未満だったときの値のみを選択します。
.Where(x => x)
値のない値Unit.Default
のストリームを持つのは奇妙であるという理由だけで選択します。true
false
.Select(x => Unit.Default);
これIObservable<Unit>
で、元のストリームが 50 未満の値を生成し、10 秒以内に 50 以上の値を生成しない場合に新しい値を解放する ができました。
それはあなたが望んでいたものですか?
何かを見逃していないことを願っていますが、最も簡単な方法は次のとおりです。
BehaviorSubject<int> value = new BehaviorSubject<int>(0);
value.Select(v => v < 50).DistinctUntilChanged().Throttle(TimeSpan.FromSeconds(10))
.Where(x => x).Subscribe(b => DoSomething());
エニグマティビティの答えは、これを処理する方法の始まりを与えてくれました。ありがとう!
@GideonEngelberth は、私の元のソリューション (以下) が機能しないことを強調しています。@Enigmativity からいくつかのアイデアを盗みましたが、ソースから値を取得するために「スイッチング ストリーム」を実際に使用するまでには至らなかったため、追加の手順を実行しました。
結果は次の拡張関数です。
public static IObservable<T> WhereTimed<T>(this IObservable<T> source, Func<T,bool> pred, TimeSpan minTime)
{
var published = source.Publish().RefCount(); // we make multiple subscriptions, let's share them
var switches = published.Select(pred).DistinctUntilChanged();
return published.Window(switches.Where(x => x), _ => switches.Where(x => !x))
.SelectMany(xs => xs.SkipUntil(Observable.Timer(minTime)).TakeWhile(pred));
}
これは、 を使用して、しきい値を下回るたびに 1 回 ( )、しきい値を超えるたびに 1 回 ( )Select(pred).DistinctUntilChanged()
起動するストリームを提供します。true
false
次に、このストリームを利用して、true
との間のオブザーバブルにウィンドウを作成しますfalse
。その新しいストリームの最初の N 秒をスキップし、まだしきい値を下回っている間に取ります。
オリジナル:(動作していません)
この問題は次のように分解されます。
ストリームが 50 未満の場合、最初の 10 秒間の値をスキップしてからストリームを返します。ストリームが 50 を超えるまでこれを行います。その後、最初からやり直します。
RX の優れた点は、これをそのまま関連する関数に変換できることです。これを正確に行う簡単な拡張メソッドを次に示します。
public static IObservable<T> WhereTimed<T>(this IObservable<T> source, Func<T,bool> pred, TimeSpan minTime)
{
var published = source.Publish().RefCount(); // we make multiple subscriptions, let's share them
var openers = published.Where(pred); // start taking at this point
var closers = published.Where(z => !pred(z)); // stop taking at this point
return openers.SkipUntil(Observable.Timer(minTime))
.TakeUntil(closers)
.Repeat();
}
これが機能することを示すテストは次のとおりです。
var ws = Observable.Repeat(1,10);
var xs = Observable.Repeat(2,10);
var ys = Observable.Repeat(100,10);
var zs = ws.Concat(xs.Delay(TimeSpan.FromSeconds(2)).Concat(ys)).Repeat(5);
zs.WhereTimed(z => z <= 50, TimeSpan.FromSeconds(1)).Subscribe(Console.WriteLine);
zs
ストリームは次のとおりです。
(1 x 10) <2 second pause> (2 x 10) (100 x 10) [repeat 5 times]
ルールに基づくと、最初の 2 秒間の一時停止でストリームがトリガーされ、2
値が 10 回表示されます。ただし、100
がヒットするとすぐにリセットされ、その後1
が再生されると、表示するのに十分な一時停止がありません1
。これが数回繰り返され、数字だけ2
が表示されます。