6

[この質問はReactive Extensions (Rx)の領域にあります]

アプリケーションの再起動時に継続する必要があるサブスクリプション

int nValuesBeforeOutput = 123;

myStream.Buffer(nValuesBeforeOutput).Subscribe(
    i => Debug.WriteLine("Something Critical on Every 123rd Value"));

ここで、このサブスクリプションの状態をシリアライズおよびデシリアライズして、次にアプリケーションが開始されたときにバッファ カウントがゼロから開始されないようにする必要があります

  • この場合、どのように IObservable.Subscribe() の状態を保持し、後でそれをロードできますか?
  • オブザーバーの状態を Rx に保存する一般的な解決策はありますか?



答えから解決へ

Paul Betts のアプローチに基づいて、最初のテストで機能した半一般化可能な実装を次に示します。

使用する

int nValuesBeforeOutput = 123;

var myRecordableStream = myStream.Record(serializer);
myRecordableStream.Buffer(nValuesBeforeOutput).ClearRecords(serializer).Subscribe(
    i => Debug.WriteLine("Something Critical on Every 123rd Value"));

拡張方法

    private static bool _alreadyRecording;

    public static IObservable<T> Record<T>(this IObservable<T> input,
                                           IRepositor repositor) 
    {
        IObservable<T> output = input;
        List<T> records = null;
        if (repositor.Deserialize(ref records))
        {
            ISubject<T> history = new ReplaySubject<T>();
            records.ForEach(history.OnNext);
            output = input.Merge(history);
        }
        if (!_alreadyRecording)
        {
            _alreadyRecording = true;
            input.Subscribe(i => repositor.SerializeAppend(new List<T> {i}));
        }
        return output;
    }

    public static IObservable<T> ClearRecords<T>(this IObservable<T> input,
                                                 IRepositor repositor)
    {
        input.Subscribe(i => repositor.Clear());
        return input;
    }

ノート

  • これは、生成された値の間の時間間隔に依存する状態を保存する場合には機能しません
  • T のシリアル化をサポートするシリアライザの実装が必要です
  • _alreadyRecordingmyRecordableStream複数回購読する場合に必要です
  • _alreadyRecordingは静的ブール値であり、非常に醜く、並列サブスクリプションが必要な場合に拡張メソッドが複数の場所で使用されるのを防ぎます - 将来の使用のために再実装する必要があります
4

1 に答える 1

1

これには一般的な解決策はなく、作成することは NonTrivial™ になります。あなたができる最も近いことは、myStream を何らかのリプレイ Observable にすることです (つまり、状態をシリアル化する代わりに、myStream の状態をシリアル化し、作業をやり直して元の場所に戻します)。

于 2012-04-10T20:42:09.610 に答える