のバッファをクリアするにはどうすればよいReplaySubject
ですか?
定期的にバッファをクリアする必要があります (私の場合は 1 日の終わりのイベントとして)ReplaySubject
継続的に成長し、最終的にすべてのメモリを消費するのを防ぎます。
ReplaySubject
理想的には、クライアントのサブスクリプションがまだ良好であるため、同じ状態を維持したいと考えています。
のバッファをクリアするにはどうすればよいReplaySubject
ですか?
定期的にバッファをクリアする必要があります (私の場合は 1 日の終わりのイベントとして)ReplaySubject
継続的に成長し、最終的にすべてのメモリを消費するのを防ぎます。
ReplaySubject
理想的には、クライアントのサブスクリプションがまだ良好であるため、同じ状態を維持したいと考えています。
ReplaySubject
バッファをクリアする手段を提供しませんが、さまざまな方法でバッファを制約するオーバーロードがいくつかあります。
TimeSpan
アイテムが保持される最大時間ReplaySubject
これは非常に興味深い問題でした。既存のサブジェクトと演算子を使用して、 you can clearのバリエーションを実装するのがいかに簡単かを確認することにしました(これらは非常に堅牢であるため)。それはかなり簡単だったことがわかりました。
これをメモリプロファイラーで実行して、正しいことを確認しました。呼び出しClear()
てバッファをフラッシュします。それ以外の場合は、通常の unbounded と同じように機能しますReplaySubject
:
public class RollingReplaySubject<T> : ISubject<T>
{
private readonly ReplaySubject<IObservable<T>> _subjects;
private readonly IObservable<T> _concatenatedSubjects;
private ISubject<T> _currentSubject;
public RollingReplaySubject()
{
_subjects = new ReplaySubject<IObservable<T>>(1);
_concatenatedSubjects = _subjects.Concat();
_currentSubject = new ReplaySubject<T>();
_subjects.OnNext(_currentSubject);
}
public void Clear()
{
_currentSubject.OnCompleted();
_currentSubject = new ReplaySubject<T>();
_subjects.OnNext(_currentSubject);
}
public void OnNext(T value)
{
_currentSubject.OnNext(value);
}
public void OnError(Exception error)
{
_currentSubject.OnError(error);
}
public void OnCompleted()
{
_currentSubject.OnCompleted();
_subjects.OnCompleted();
// a quick way to make the current ReplaySubject unreachable
// except to in-flight observers, and not hold up collection
_currentSubject = new Subject<T>();
}
public IDisposable Subscribe(IObserver<T> observer)
{
return _concatenatedSubjects.Subscribe(observer);
}
}
通常のルールを尊重し (すべての場合Subject
と同様)、このクラスのメソッドを同時に呼び出さないでください ( を含む) Clear()
。必要に応じて、同期ロックを簡単に追加できます。
これは、マスター ReplaySubject 内に一連の ReplaySubject をネストすることによって機能します。外側の ReplaySubject ( _subjects
) は、ちょうど 1 つの内側の ReplaySubject ( ) のバッファーを保持し、_currentSubject
構築時に読み込まれます。
OnXXX
メソッドは ReplaySubject を呼び出します_currentSubject
。
オブザーバーは、ネストされた ReplaySubjects ( に保持されている_concatenatedSubjects
) の連結されたプロジェクションにサブスクライブされます。のバッファ サイズ_subjects
はちょうど 1 であるため、新しいサブスクライバは最新ReplaySubject
以降のイベントのみを取得します。
「バッファをクリア」する必要があるときはいつでも、既存の_currentSubject
とOnCompleted
新しい ReplaySubject が追加され_subjects
、新しい になり_currentSubject
ます。
@Brandon の提案に従って、または入力ストリームをRollingReplaySubject
使用してバッファのクリアを通知するバージョンを作成しました。TimeSpan
ここでGistを作成しました:https://gist.github.com/james-world/c46f09f32e2d4f338b07
Observable なデータ ソースが既にある可能性があります。その場合は、別の解決策があります。これは、私が個人的に警戒している独自の ISubject を構築するのではなく、既存の RX コンストラクトの合成を使用します。
public class ClearableReplaySubject<TSource, TClearTrigger> : IConnectableObservable<TSource>
{
private readonly IConnectableObservable<IObservable<TSource>> _underlying;
private readonly SerialDisposable _replayConnectDisposable = new SerialDisposable();
public ClearableReplaySubject(IObservable<TSource> src, IObservable<TClearTrigger> clearTrigger)
{
_underlying = clearTrigger.Select(_ => Unit.Default).StartWith(Unit.Default)
.Select(_ =>
{
var underlyingReplay = src.Replay();
_replayConnectDisposable.Disposable = underlyingReplay.Connect();
return underlyingReplay;
})
.Replay(1);
}
public IDisposable Subscribe(IObserver<TSource> observer)
{
return _underlying.Switch().Subscribe(observer);
}
public IDisposable Connect()
{
return new CompositeDisposable(_underlying.Connect(), _replayConnectDisposable.Disposable);
}
}
次の拡張メソッドを ObservableEx に追加すると:
public static class ObservableEx
{
public static IConnectableObservable<TItem> ReplayWithReset<TItem, TReset>(this IObservable<TItem> src, IObservable<TReset> resetTrigger)
{
return new ClearableReplaySubject<TItem, TReset>(src, resetTrigger);
}
}
次に、ソースを取得して、リセットトリガー Observable を使用して.ReplayWithReset(...)を追加できます。これはタイマーでも何でもかまいません。
var replay = sourceObservable.ReplayWithReset(triggerObservable);
var connection = replay.Connect();
接続は、リプレイと同じように動作します。