IObservable<T>
述語に一致する要素を探して実行したいのですが、見つからない場合は、 の最後の要素を返しIObservable<T>
ます。の内容全体を保存する必要はなく、 を 2 回IObservable<T>
ループしたくないIObservable
ので、拡張メソッドを設定しました。
public static class ObservableExtensions
{
public static IObservable<T> FirstOrLastAsync<T>(this IObservable<T> source, Func<T, bool> pred)
{
return Observable.Create<T>(o =>
{
var hot = source.Publish();
var store = new AsyncSubject<T>();
var d1 = hot.Subscribe(store);
var d2 = hot.FirstAsync(x => pred(x)).Amb(store).Subscribe(o);
var d3 = hot.Connect();
return new CompositeDisposable(d1, d2, d3);
});
}
public static T FirstOrLast<T>(this IObservable<T> source, Func<T, bool> pred)
{
return source.FirstOrLastAsync(pred).Wait();
}
}
Async メソッドは、渡された可能性のあるコールド オブザーバブルからホット オブザーバブルを作成します。AsyncSubject<T>
最後の要素を記憶するために をサブスクライブIObservable<T>
し、要素を探す をサブスクライブします。IObservable<T>
次に、これらの s のいずれかから最初の要素を取得します。どちらが最初に値を返すか.Amb
(メッセージAsyncSubject<T>
を取得するまで値を返しません)。.OnCompleted
私の質問は次のとおりです。
- さまざまな Observable メソッドを使用して、これをより適切に、またはより簡潔に記述できますか?
- これらの使い捨てはすべて CompositeDisposable に含める必要がありますか?
- 一致する要素が見つからずにホット オブザーバブルが完了すると、FirstAsync が例外をスローし、AsyncSubject がその値を伝播する間に競合状態が発生しますか?
- その場合、行を次のように変更する必要がありますか?
var d2 = hot.Where(x => pred(x)).Take(1).Amb(store).Subscribe(o);
私は RX の初心者で、これが IObservable の最初の拡張機能です。
編集
私は結局一緒に行きました
public static class ObservableExtensions
{
public static IObservable<T> FirstOrLastAsync<T>(this IObservable<T> source, Func<T, bool> pred)
{
var hot = source.Publish().RefCount();
return hot.TakeLast(1).Amb(hot.Where(pred).Take(1).Concat(Observable.Never<T>()));
}
public static T FirstOrLast<T>(this IObservable<T> source, Func<T, bool> pred)
{
return source.FirstOrLastAsync(pred).First();
}
}