24

のバッファをクリアするにはどうすればよいReplaySubjectですか?

定期的にバッファをクリアする必要があります (私の場合は 1 日の終わりのイベントとして)ReplaySubject継続的に成長し、最終的にすべてのメモリを消費するのを防ぎます。

ReplaySubject理想的には、クライアントのサブスクリプションがまだ良好であるため、同じ状態を維持したいと考えています。

4

3 に答える 3

22

ReplaySubjectバッファをクリアする手段を提供しませんが、さまざまな方法でバッファを制約するオーバーロードがいくつかあります。

  • TimeSpanアイテムが保持される最大時間
  • 最大アイテム数
  • 上記の組み合わせで、いずれかの条件が満たされるとすぐにアイテムをドロップします。

クリア可能な ReplaySubject

ReplaySubjectこれは非常に興味深い問題でした。既存のサブジェクトと演算子を使用して、 you can clearのバリエーションを実装するのがいかに簡単かを確認することにしました(これらは非常に堅牢であるため)。それはかなり簡単だったことがわかりました。

これをメモリプロファイラーで実行して、正しいことを確認しました。呼び出しClear()てバッファをフラッシュします。それ以外の場合は、通常の unbounded と同じように機能しますReplaySubject:

public class RollingReplaySubject<T> : ISubject<T>
{
    private readonly ReplaySubject<IObservable<T>> _subjects;
    private readonly IObservable<T> _concatenatedSubjects;
    private ISubject<T> _currentSubject;

    public RollingReplaySubject()
    {
        _subjects = new ReplaySubject<IObservable<T>>(1);
        _concatenatedSubjects = _subjects.Concat();
        _currentSubject = new ReplaySubject<T>();
        _subjects.OnNext(_currentSubject);
    }

    public void Clear()
    {
        _currentSubject.OnCompleted();
        _currentSubject = new ReplaySubject<T>();
        _subjects.OnNext(_currentSubject);
    }

    public void OnNext(T value)
    {
        _currentSubject.OnNext(value);
    }

    public void OnError(Exception error)
    {
        _currentSubject.OnError(error);
    }

    public void OnCompleted()
    {
        _currentSubject.OnCompleted();
        _subjects.OnCompleted();     
        // a quick way to make the current ReplaySubject unreachable
        // except to in-flight observers, and not hold up collection
        _currentSubject = new Subject<T>();       
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _concatenatedSubjects.Subscribe(observer);
    }
}

通常のルールを尊重し (すべての場合Subjectと同様)、このクラスのメソッドを同時に呼び出さないでください ( を含む) Clear()。必要に応じて、同期ロックを簡単に追加できます。

これは、マスター ReplaySubject 内に一連の ReplaySubject をネストすることによって機能します。外側の ReplaySubject ( _subjects) は、ちょうど 1 つの内側の ReplaySubject ( ) のバッファーを保持し、_currentSubject構築時に読み込まれます。

OnXXXメソッドは ReplaySubject を呼び出します_currentSubject

オブザーバーは、ネストされた ReplaySubjects ( に保持されている_concatenatedSubjects) の連結されたプロジェクションにサブスクライブされます。のバッファ サイズ_subjectsはちょうど 1 であるため、新しいサブスクライバは最新ReplaySubject以降のイベントのみを取得します。

「バッファをクリア」する必要があるときはいつでも、既存の_currentSubjectOnCompleted新しい ReplaySubject が追加され_subjects、新しい になり_currentSubjectます。

機能強化

@Brandon の提案に従って、または入力ストリームをRollingReplaySubject使用してバッファのクリアを通知するバージョンを作成しました。TimeSpanここでGistを作成しました:https://gist.github.com/james-world/c46f09f32e2d4f338b07

于 2015-03-09T15:13:45.590 に答える
0

Observable なデータ ソースが既にある可能性があります。その場合は、別の解決策があります。これは、私が個人的に警戒している独自の ISubject を構築するのではなく、既存の RX コンストラクトの合成を使用します。

public class ClearableReplaySubject<TSource, TClearTrigger> : IConnectableObservable<TSource>
{
    private readonly IConnectableObservable<IObservable<TSource>> _underlying;
    private readonly SerialDisposable _replayConnectDisposable = new SerialDisposable();

    public ClearableReplaySubject(IObservable<TSource> src, IObservable<TClearTrigger> clearTrigger)
    {
        _underlying = clearTrigger.Select(_ => Unit.Default).StartWith(Unit.Default)
            .Select(_ =>
            {
                var underlyingReplay = src.Replay();
                _replayConnectDisposable.Disposable = underlyingReplay.Connect();
                return underlyingReplay;
            })
            .Replay(1);
    }

    public IDisposable Subscribe(IObserver<TSource> observer)
    {
        return _underlying.Switch().Subscribe(observer);
    }

    public IDisposable Connect()
    {
        return new CompositeDisposable(_underlying.Connect(), _replayConnectDisposable.Disposable);
    }
}

次の拡張メソッドを ObservableEx に追加すると:

public static class ObservableEx
{
    public static IConnectableObservable<TItem> ReplayWithReset<TItem, TReset>(this IObservable<TItem> src, IObservable<TReset> resetTrigger)
    {
        return new ClearableReplaySubject<TItem, TReset>(src, resetTrigger);
    }
}

次に、ソースを取得して、リセットトリガー Observable を使用して.ReplayWithReset(...)を追加できます。これはタイマーでも何でもかまいません。

var replay = sourceObservable.ReplayWithReset(triggerObservable);
var connection = replay.Connect();

接続は、リプレイと同じように動作します。

于 2015-03-24T13:31:34.393 に答える