編集: ここで物事を単純化するために、パラダイムがあります: 継続的なストリームによって常に更新されているアイテムのリストがあります。ときどき、ストリームを再初期化する新しいデータ スナップショットを取得します。その結果、再初期化したいときに更新が発生した場合、これらの更新が停止し、新しいスナップショットが使用されていることを確認する必要があります。
UI に表示する必要がある更新の連続データ ストリームを多数処理しています。更新は逆の順序で表示する必要があります。つまり、最新の更新がリストの一番上に表示されます。結果を一番上に表示するには、リストに挿入する必要があります。いつかリストを休ませる必要があるという問題(つまり、List.Clear)ですが、挿入の途中である場合は停止する必要があります。そうしないと、挿入すると例外が発生するためです。
私はそれを助けるために反応的な方法をまとめましたが、それは私のまでのストリームを無視しているようです。
public static IObservable<T> BufferAndDispatchUntil<T, TStopUnit>(
this IObservable<T> source,
Action<T> onNext,
IScheduler scheduler,
IObservable<TStopUnit> until,
DispatcherPriority dispatcherPriority = DispatcherPriority.Background)
{
if (source == null) throw new ArgumentNullException("source");
if (onNext == null) throw new ArgumentNullException("onNext");
if (Application.Current == null)
return source.Do(onNext);
var dispatcher = Application.Current.Dispatcher;
return source
.LazyBuffer(BufferTime, BufferCount, scheduler)
.TakeUntil(until)
.Do(b => dispatcher.BeginInvoke(() => b.ForEach(onNext), dispatcherPriority))
.SelectMany(i => i);
}
LazyBuffer は Buffer のカスタム実装であり、指定された間隔で空の結果セットを返すのではなく、新しい項目が利用可能になったときにのみ結果セットを返します。これが私がそれを呼び出す方法です、それは上記のように吹きます。
BufferAndDispatchUntil(p => Update.Insert(p.Item1, UpdateFactory.CreateView(p.Item2)), _config.DispatcherScheduler, _ignore);
これは、別のスレッドで実行されているコードの別のセグメントでの明確な呼び出しです。
_ignore.OnNext(new Unit());
Update.Clear();
それを理解するのを手伝っていただければ幸いです。