[この質問はReactive Extensions (Rx)の領域にあります]
アプリケーションの再起動時に継続する必要があるサブスクリプション
int nValuesBeforeOutput = 123;
myStream.Buffer(nValuesBeforeOutput).Subscribe(
i => Debug.WriteLine("Something Critical on Every 123rd Value"));
ここで、このサブスクリプションの状態をシリアライズおよびデシリアライズして、次にアプリケーションが開始されたときにバッファ カウントがゼロから開始されないようにする必要があります。
- この場合、どのように IObservable.Subscribe() の状態を保持し、後でそれをロードできますか?
- オブザーバーの状態を Rx に保存する一般的な解決策はありますか?
答えから解決へ
Paul Betts のアプローチに基づいて、最初のテストで機能した半一般化可能な実装を次に示します。
使用する
int nValuesBeforeOutput = 123;
var myRecordableStream = myStream.Record(serializer);
myRecordableStream.Buffer(nValuesBeforeOutput).ClearRecords(serializer).Subscribe(
i => Debug.WriteLine("Something Critical on Every 123rd Value"));
拡張方法
private static bool _alreadyRecording;
public static IObservable<T> Record<T>(this IObservable<T> input,
IRepositor repositor)
{
IObservable<T> output = input;
List<T> records = null;
if (repositor.Deserialize(ref records))
{
ISubject<T> history = new ReplaySubject<T>();
records.ForEach(history.OnNext);
output = input.Merge(history);
}
if (!_alreadyRecording)
{
_alreadyRecording = true;
input.Subscribe(i => repositor.SerializeAppend(new List<T> {i}));
}
return output;
}
public static IObservable<T> ClearRecords<T>(this IObservable<T> input,
IRepositor repositor)
{
input.Subscribe(i => repositor.Clear());
return input;
}
ノート
- これは、生成された値の間の時間間隔に依存する状態を保存する場合には機能しません
- T のシリアル化をサポートするシリアライザの実装が必要です
_alreadyRecording
myRecordableStream
複数回購読する場合に必要です_alreadyRecording
は静的ブール値であり、非常に醜く、並列サブスクリプションが必要な場合に拡張メソッドが複数の場所で使用されるのを防ぎます - 将来の使用のために再実装する必要があります