あなたの必要性を正しく理解していれば、これはうまくいきます:
var notification = _sensor.Publish(ps => ps
.Select(x => x >= 15.0)
.DistinctUntilChanged()
.Select(p => p
? Observable.Empty<double>()
: Observable
.Timer(TimeSpan.FromMinutes(1.0))
.Select(x => -1.0)
.IgnoreElements()
.Concat(ps))
.Switch());
私はそれがであると仮定し_sensor
ましたIObservable<double>
。
_sensor
したがって、このオブザーバブルは、値が少なくとも 1 分間 15.0 を下回っている限り、ストリームからすべての値を公開します。この機能をテストしました。
私のテストコードは次のとおりです。
var random = new Random();
var _sensor = Observable.Generate(
0,
x => true,
x => x,
x => random.NextDouble() * 16.0,
x => TimeSpan.FromSeconds(random.NextDouble()));
var published_sensor = _sensor.Publish();
var notification = published_sensor.Publish(ps => ps
.Select(x => x >= 15.0)
.DistinctUntilChanged()
.Select(p => p
? Observable.Empty<double>()
: Observable
.Timer(TimeSpan.FromSeconds(5.0))
.Select(x => -1.0)
.IgnoreElements()
.Concat(ps))
.Switch());
published_sensor.Merge(notification).Timestamp().Dump();
published_sensor.Connect();
私が得た結果は次のとおりです。
重複値が公開されるまでに 5 秒が経過し、ソースが 15 を超える値を生成すると重複が停止することに注意してください。