2

センサー データの観測可能なホット ストリームがあります。センサー値が一定期間 15 を下回った場合にのみ発火する観測可能な信号が必要です。値が 15 を超えた場合はいつでも、スライディング ウィンドウをリセットする必要があります。以下のコードで部分的に動作するようにしましたが、値が常に 15 未満の場合はトリガーされません。

var notification = _sensor.Where(v => v >= 15)
                          .Throttle(new TimeSpan(0, 1, 0))
                          .SelectMany(_ => Observable.Return(Unit.Default));

助言がありますか?

4

2 に答える 2

1

_sensor15 以上の値を発行しない場合、Throttleは呼び出されません。

_sensor簡単な修正は、またはにウェイクアップ通知を追加することですnotification

var wakeup = Observable.Return(15);

var notification = _sensor.Merge(wakeup)
                          .Where(v => v >= 15)
                          .Throttle(new TimeSpan(0, 1, 0))
                          .SelectMany(_ => Observable.Return(Unit.Default));
于 2015-09-09T07:38:48.783 に答える
0

あなたの必要性を正しく理解していれば、これはうまくいきます:

var notification = _sensor.Publish(ps => ps
    .Select(x => x >= 15.0)
    .DistinctUntilChanged()
    .Select(p => p
        ? Observable.Empty<double>()
        : Observable
            .Timer(TimeSpan.FromMinutes(1.0))
            .Select(x => -1.0)
            .IgnoreElements()
            .Concat(ps))
    .Switch());

私はそれがであると仮定し_sensorましたIObservable<double>

_sensorしたがって、このオブザーバブルは、値が少なくとも 1 分間 15.0 を下回っている限り、ストリームからすべての値を公開します。この機能をテストしました。

私のテストコードは次のとおりです。

var random = new Random();
var _sensor = Observable.Generate(
    0,
    x => true,
    x => x,
    x => random.NextDouble() * 16.0,
    x => TimeSpan.FromSeconds(random.NextDouble()));

var published_sensor = _sensor.Publish();

var notification = published_sensor.Publish(ps => ps
    .Select(x => x >= 15.0)
    .DistinctUntilChanged()
    .Select(p => p
        ? Observable.Empty<double>()
        : Observable
            .Timer(TimeSpan.FromSeconds(5.0))
            .Select(x => -1.0)
            .IgnoreElements()
            .Concat(ps))
    .Switch());

published_sensor.Merge(notification).Timestamp().Dump();

published_sensor.Connect();

私が得た結果は次のとおりです。

結果

重複値が公開されるまでに 5 秒が経過し、ソースが 15 を超える値を生成すると重複が停止することに注意してください。

于 2015-09-09T08:23:07.440 に答える