2

サブスクライバーが通常どおりリプレイ バッファーと将来の通知を受信できるようにする大きな ReplaySubject があります。さらに、現在のバッファの「スナップショット」を取得し、サブスクライブすることなく、それをリストとして同期的に返すことができるようにしたいと考えています。

これを行う方法はありますか?

ありがとう

4

2 に答える 2

2

購読してアイテムを受け取り、購読を解除するだけではありませんか?

public static List<T> Snapshot<T>(ReplaySubject<T> subject)
{
    List<T> snapshot = new List<T>();
    using (subject.Subscribe(item => snapshot.Add(item))
    {
        // Deliberately empty; subscribing will add everything to the list.
    }
    return snapshot;
}

ReplaySubject<T>もちろん、これは、a をサブスクライブすると、要素ハンドラーが同期的に呼び出されることを前提としています。あなたはそれを確認したいでしょうが、それは私が期待することです.

また、何らかの方法でエラー/完了を処理するかどうかも検討する必要があります。

于 2013-03-03T13:03:52.573 に答える
0

@PaulBettsがそれを行う唯一の方法があると言ったからです:)

注:この方法はお勧めしません。スキート射撃の方法を使用すると、後で自分に感謝します。

したがって、ReplaySubject<T>すべての魔法はOnNext、内部で受け取った値をキューに入れるという事実と関係がありますQueue<TimeInterval<T>>。したがって、リプレイサブジェクトのプライベートな詳細の内部をいじくり回してその情報を取得するラッパーを作成できます。

public class FixedReplaySubject<T> : ISubject<T>
{
    private ReplaySubject<T> _inner;
    private Func<Queue<TimeInterval<T>>> _snapshotGetter;

    public FixedReplaySubject(ReplaySubject<T> source)
    {
        _inner = source;
        var expr = Expression.Lambda(
            typeof(Func<Queue<TimeInterval<T>>>), 
            Expression.Field(
                Expression.Constant(source), 
                source.GetType()
                    .GetField("_queue", BindingFlags.NonPublic|BindingFlags.Instance)));
        _snapshotGetter = (Func<Queue<TimeInterval<T>>>)expr.Compile();
    }

    public IEnumerable<TimeInterval<T>> Snapshot()
    {
        return _snapshotGetter();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _inner.Subscribe(observer);
    }
    public void OnNext(T value)
    {
        _inner.OnNext(value);
    }
    public void OnCompleted()
    {
        _inner.OnCompleted();
    }
    public void OnError(Exception error)
    {
        _inner.OnError(error);
    }
    public void Dispose()
    {
        _inner.Dispose();
    }
}

テストリグ:

void Main()
{
    var src = new ReplaySubject<int>();
    src.OnNext(1);
    src.OnNext(2);
    src.OnNext(3);
    src.OnNext(4);
    src.OnNext(5);
    src.OnNext(6);
    var heh = new FixedReplaySubject<int>(src);
    heh.Snapshot().Dump();
}

結果:(TimeInterval<T>値+入った時間だけです)

1 00:00:00.0010265 
2 00:00:00.0010278 
3 00:00:00.0010278 
4 00:00:00.0010282 
5 00:00:00.0010282 
6 00:00:00.0010286 
于 2013-03-04T22:33:07.280 に答える