1

IObservable<T>述語に一致する要素を探して実行したいのですが、見つからない場合は、 の最後の要素を返しIObservable<T>ます。の内容全体を保存する必要はなく、 を 2 回IObservable<T>ループしたくないIObservableので、拡張メソッドを設定しました。

public static class ObservableExtensions
{
    public static IObservable<T> FirstOrLastAsync<T>(this IObservable<T> source, Func<T, bool> pred)
    {
        return Observable.Create<T>(o =>
        {
            var hot = source.Publish();
            var store = new AsyncSubject<T>();
            var d1 = hot.Subscribe(store);
            var d2 = hot.FirstAsync(x => pred(x)).Amb(store).Subscribe(o);
            var d3 = hot.Connect();
            return new CompositeDisposable(d1, d2, d3);
        });
    }

    public static T FirstOrLast<T>(this IObservable<T> source, Func<T, bool> pred)
    {
        return source.FirstOrLastAsync(pred).Wait();
    }
}

Async メソッドは、渡された可能性のあるコールド オブザーバブルからホット オブザーバブルを作成します。AsyncSubject<T>最後の要素を記憶するために をサブスクライブIObservable<T>し、要素を探す をサブスクライブします。IObservable<T>次に、これらの s のいずれかから最初の要素を取得します。どちらが最初に値を返すか.Amb(メッセージAsyncSubject<T>を取得するまで値を返しません)。.OnCompleted

私の質問は次のとおりです。

  • さまざまな Observable メソッドを使用して、これをより適切に、またはより簡潔に記述できますか?
  • これらの使い捨てはすべて CompositeDisposable に含める必要がありますか?
  • 一致する要素が見つからずにホット オブザーバブルが完了すると、FirstAsync が例外をスローし、AsyncSubject がその値を伝播する間に競合状態が発生しますか?
  • その場合、行を次のように変更する必要がありますか?

var d2 = hot.Where(x => pred(x)).Take(1).Amb(store).Subscribe(o);

私は RX の初心者で、これが IObservable の最初の拡張機能です。

編集

私は結局一緒に行きました

public static class ObservableExtensions
{
    public static IObservable<T> FirstOrLastAsync<T>(this IObservable<T> source, Func<T, bool> pred)
    {
        var hot = source.Publish().RefCount();
        return hot.TakeLast(1).Amb(hot.Where(pred).Take(1).Concat(Observable.Never<T>()));
    }

    public static T FirstOrLast<T>(this IObservable<T> source, Func<T, bool> pred)
    {
        return source.FirstOrLastAsync(pred).First();
    }
}
4

2 に答える 2

1

一緒に欲しい2つのケースをアンブすることができます。観測可能なソースが冷たい場合は、を実行できますPublish|Refcount

    public static IObservable<T> FirstOrLast<T>(this IObservable<T> source, Func<T, bool> predicate)
    {
        return source.TakeLast(1).Amb(source.Where(predicate).Take(1));
    }

テスト:

        var source = Observable.Interval(TimeSpan.FromSeconds(0.1))
                               .Take(10)
                               .Publish()
                               .RefCount();

        FirstOrLast(source, i => i == 5).Subscribe(Console.WriteLine); //5
        FirstOrLast(source, i => i == 11).Subscribe(Console.WriteLine); //9
于 2012-09-05T07:30:00.653 に答える
0

機能する「より単純な」クエリを作成しようとしましたが、これまでのところ何もありません。

私があなたの基本的な構造に固執するなら、私はわずかな改善を提供することができます. これを試して:

public static IObservable<T> FirstOrLastAsync<T>(
    this IObservable<T> source, Func<T, bool> pred)
{
    return Observable.Create<T>(o =>
    {
        var hot = source.Publish();
        var store = new AsyncSubject<T>();
        var d1 = hot.Subscribe(store);
        var d2 =
            hot
                .Where(x => pred(x))
                .Concat(store)
                .Take(1)
                .Subscribe(o);
        var d3 = hot.Connect();
        return new CompositeDisposable(d1, d2, d3);
    });
}

それほど優れているわけではありませんが、使用するよりも気に入っていますAmb。それは私が思うに少しきれいです。

于 2012-09-05T08:26:57.503 に答える