7

私はこのコードを持っています:

var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();
var ss = s1.Merge(s2).Finally(() => Console.WriteLine("Finished!"));

ss.Subscribe(_ => Console.WriteLine("Next"));

s1.OnNext(new Unit());
s2.OnNext(new Unit());
s1.OnCompleted(); // I wish ss finished here.
s2.OnCompleted(); // Yet it does so here. =(

OnError(new OperationCanceledException()) を使用して問題を解決しましたが、より良い解決策が必要です (コンビネーターが必要ですか?)。

4

4 に答える 4

8

いずれかのストリームが完了したときに終了するようにMergeを書き直す代わりに、onCompletedイベントをonNextイベントに変換し、var ss = s1.Merge(s2).TakeUntil(s1ors2complete)s1またはs2のいずれかが終了したときにs1ors2completeが値を生成する場所を使用することをお勧めします。.TakeUntil(s1completes).TakeUntil(s2completes)s1ors2completeを作成する代わりに、チェーンすることもできます。このアプローチは、MergeWithCompleteOnEither拡張機能よりも優れた構成を提供します。これは、「両方が完了したときに完了する」演算子を「完了したときに完了する」演算子に変更するために使用できるためです。

onNextイベントをonCompletedイベントに変換する方法については、いくつかの方法があります。CompositeDisposableメソッドは良いアプローチのように聞こえます。少し検索すると、onNext、onError、およびonCompleted通知間の変換に関するこの興味深いスレッドが見つかります。を使用してReturnTrueOnCompletedという拡張メソッドを作成するxs.SkipWhile(_ => true).concat(Observable.Return(True))と、マージは次のようになります。

var s1ors2complete = s1.ReturnTrueOnCompleted().Amb(s2.ReturnTrueOnCompleted());
var ss = s1.Merge(s2).TakeUntil(s1ors2complete).Finally(() => Console.WriteLine("Finished!"));

また、入力ストリームの1つが完了すると自動的に完了するZipなどの演算子を使用することもできます。

于 2011-02-04T08:32:03.630 に答える
8

または、これも非常にきちんとしています。

public static class Ext
{
    public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
    {
        return Observable.CreateWithDisposable<T>(obs =>
        {
            var compositeDisposable = new CompositeDisposable();
            var subject = new Subject<T>();

            compositeDisposable.Add(subject.Subscribe(obs));
            compositeDisposable.Add(source.Subscribe(subject));
            compositeDisposable.Add(right.Subscribe(subject));


            return compositeDisposable;

        });     
    }
}

これは、CreateWithDisposable(); で OnCompleted が 1 つだけオブザーバーにプッシュされるようにするサブジェクトを使用します。

于 2011-02-03T15:58:54.547 に答える
2

どちらのストリームの出力も必要ないと仮定すると、次Ambの魔法と組み合わせて使用​​できますMaterialize

var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();

var ss = Observable.Amb(
        s1.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted), 
        s2.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted)
    )
    .Finally(() => Console.WriteLine("Finished!"));

ss.Subscribe(_ => Console.WriteLine("Next"));

s1.OnNext(new Unit());
s2.OnNext(new Unit());

s1.OnCompleted(); // ss will finish here and s2 will be unsubscribed from

値が必要な場合はDo、2つの主題で使用できます。

于 2011-02-03T16:33:17.447 に答える
0

これを試して:

public static class Ext
{
    public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
    {
        var completed = Observable.Throw<T>(new StreamCompletedException());

        return 
            source.Concat(completed)
            .Merge(right.Concat(completed))
            .Catch((StreamCompletedException ex) => Observable.Empty<T>());

    }

    private sealed class StreamCompletedException : Exception
    {
    }
}

これは、ソースまたは適切なソースのいずれかが完了したときに例外をスローするIObservableを連結します。次に、Catch拡張メソッドを使用して、空のObservableを返し、いずれかが完了したときにストリームを自動的に完了することができます。

于 2011-02-03T15:47:44.157 に答える