13

私が遭遇したユースケースは、私だけではないと思われますが、次のような方法です。

IObservable<T> Observable.RepeatLastValueDuringSilence(this IObservable<T> inner, TimeSpan maxQuietPeriod);

これは、内部のobservableから将来のすべてのアイテムを返しますが、内部のobservableが一定期間OnNextを呼び出さない場合(maxQuietPeriod)、最後の値を繰り返すだけです(もちろん、内部がOnCompletedまたはOnErrorを呼び出すまで) 。

正当な理由は、サービスが定期的なステータス更新を定期的にpingアウトすることです。例えば:

var myStatus = Observable.FromEvent(
    h=>this.StatusUpdate+=h,
    h=>this.StatusUpdate-=h);

var messageBusStatusPinger = myStatus
    .RepeatLastValueDuringSilence(TimeSpan.FromSeconds(1))
    .Subscribe(update => _messageBus.Send(update));

このようなものはありますか?それとも私はそれの有用性を過大評価していますか?

ありがとう、アレックス

PS:Rxを初めて調べているだけなので、用語や構文が間違っていることをお詫びします。

4

4 に答える 4

10

Matthewと同様の解決策ですが、ここでは、各要素がソースで受信された後にタイマーが開始します。これはより正しいと思います(ただし、違いが問題になる可能性は低いです)。

public static IObservable<T> RepeatLastValueDuringSilence<T>(this IObservable<T> inner, TimeSpan maxQuietPeriod)
{    
    return inner.Select(x => 
        Observable.Interval(maxQuietPeriod)
                  .Select(_ => x)
                  .StartWith(x)
    ).Switch();
}

そしてテスト:

var source = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "1")
                       .Concat(Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(_ => "2"))
                       .Concat(Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "3"));

source.RepeatLastValueDuringSilence(TimeSpan.FromMilliseconds(200)).Subscribe(Console.WriteLine);

110回印刷され(ソースから5回、無音中に5回繰り返される)、2ソースから1回、それぞれの間の無音からさらに4回印刷され、その後に無限が続くはず3です。

于 2012-07-13T05:18:13.353 に答える
9

この非常に単純なクエリがその役割を果たします。

var query =
    source
        .Select(s =>
            Observable
                .Interval(TimeSpan.FromSeconds(1.0))
                .StartWith(s)
                .Select(x => s))
        .Switch();

の力を過小評価しないでください.Switch()

于 2012-07-13T09:41:49.113 に答える
1

私はこれがあなたが望むことをしていると思います、あなたの観測量が熱くないならあなたはそうする必要があるでしょうPublishそしてRefcountそれ:

public static IObservable<T> RepeatLastValueDuringSilence<T>(this IObservable<T> inner, TimeSpan maxQuietPeriod)
{
    var throttled = inner.Throttle(maxQuietPeriod);
    var repeating = throttled.SelectMany(i => 
        Observable
            .Interval(maxQuietPeriod)
            .Select(_ => i)
            .TakeUntil(inner));
    return Observable.Merge(inner, throttled, repeating);
}
于 2012-07-13T00:55:48.513 に答える
-1

Rxライブラリにはメソッドはありませんが、そのようなメソッドも必要でした。私のユースケースでは、ソースが値を出力しなくても値を出力する必要がありました。最初のソース値が取得されるまで値を出力したくない場合は、サブスクライブ呼び出しの前にdefaultValueパラメーターと呼び出しを削除できます。createTimer()

タイマーを実行するには、スケジューラーが必要です。明らかな過負荷は、スケジューラーを使用せず、デフォルトのスケジューラーを選択するものです(私はThreadPoolスケジューラーを使用しました)。

Imports System.Reactive
Imports System.Reactive.Concurrency
Imports System.Reactive.Disposables
Imports System.Reactive.Linq

<Extension()>
Public Function AtLeastEvery(Of T)(source As IObservable(Of T), 
                                   timeout As TimeSpan, 
                                   defaultValue As T, 
                                   scheduler As IScheduler
                                  ) As IObservable(Of T)
    If source Is Nothing Then Throw New ArgumentNullException("source")
    If scheduler Is Nothing Then Throw New ArgumentNullException("scheduler")
    Return Observable.Create(
        Function(observer As IObserver(Of T))
            Dim id As ULong
            Dim gate As New Object()
            Dim timer As New SerialDisposable()
            Dim lastValue As T = defaultValue

            Dim createTimer As Action =
                Sub()
                    Dim startId As ULong = id
                    timer.Disposable = scheduler.Schedule(timeout,
                                           Sub(self As Action(Of TimeSpan))
                                               Dim noChange As Boolean
                                               SyncLock gate
                                                   noChange = (id = startId)
                                                   If noChange Then
                                                       observer.OnNext(lastValue)
                                                   End If
                                               End SyncLock
                                               'only restart if no change, otherwise
                                               'the change restarted the timeout
                                               If noChange Then self(timeout)
                                           End Sub)
                End Sub
            'start the first timeout
            createTimer()
            'subscribe to the source observable
            Dim subscription = source.Subscribe(
                Sub(v)
                    SyncLock gate
                        id += 1UL
                        lastValue = v
                    End SyncLock
                    observer.OnNext(v)
                    createTimer() 'reset the timeout
                End Sub,
                Sub(ex)
                    SyncLock gate
                        id += 1UL
                    End SyncLock
                    observer.OnError(ex)
                    'do not reset the timeout, because the sequence has ended
                End Sub,
                Sub()
                    SyncLock gate
                        id += 1UL
                    End SyncLock
                    observer.OnCompleted()
                    'do not reset the timeout, because the sequence has ended
                End Sub)

            Return New CompositeDisposable(timer, subscription)
        End Function)
End Function
于 2012-07-12T17:32:58.450 に答える