6

Rx を使用して、複数の観測可能なイベントを単一のイベント セットにストリーミングする方法を確認しようとしています。しかし、以下のコードを実行すると例外が発生します。これは、Rx 文法に違反しているため、複数のオブザーバーが常に例外を起こしやすいということですか? つまり、これらの複数のオブザーバーのうち 2 つが偶然に同時にイベントを生成した場合 (2 つのオブザーバブルが同時に生成される可能性があります)、例外が発生するはずです。

DateTimeOffset start;
        object sync = new object();
        var subject = new Subject<long>();
        var observer = Observer.Create<long>(c =>
        {
            lock (sync)
            {
                Console.WriteLine(c);
            }
        })
            ;

        var observable1 = Observable.Interval(TimeSpan.FromSeconds(2));
        var observable2 = Observable.Interval(TimeSpan.FromSeconds(5));
        var observable3 = Observable.Never<long>().Timeout
            (start = DateTimeOffset.Now.AddSeconds(15),
             (new long[] { 1 }).ToObservable());
        var observable4 = Observable.Never<long>().Timeout(start);
        observable1.Subscribe(observer);
        observable2.Subscribe(observer);
        observable3.Subscribe(observer);
        observable4.Subscribe(observer);
        Thread.Sleep(20000);

説明してくれたギデオンに感謝します。これは私が得ている例外です。timeoutexception であることは間違いありません。これはコーディングミスでした。ありがとう。

System.TimeoutException: The operation has timed out.
   at System.Reactive.Observer.<Create>b__8[T](Exception e)
   at System.Reactive.AnonymousObserver`1.Error(Exception exception)
   at System.Reactive.AbstractObserver`1.OnError(Exception error)
   at System.Reactive.Subjects.Subject`1.OnError(Exception error)
   at System.Reactive.AnonymousObservable`1.AutoDetachObserver.Error(Exception e
xception)
   at System.Reactive.AbstractObserver`1.OnError(Exception error)
   at System.Reactive.AnonymousObservable`1.AutoDetachObserver.Error(Exception e
xception)
   at System.Reactive.AbstractObserver`1.OnError(Exception error)
   at System.Reactive.Linq.Observable.<>c__DisplayClass28c`1.<>c__DisplayClass28
e.<Throw>b__28b()
   at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action
action)
   at System.Reactive.Concurrency.ImmediateScheduler.Schedule[TState](TState sta
te, Func`3 action)
   at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Actio
n action)
   at System.Reactive.Linq.Observable.<>c__DisplayClass28c`1.<Throw>b__28a(IObse
rver`1 observer)
   at System.Reactive.AnonymousObservable`1.<>c__DisplayClass1.<Subscribe>b__0()

   at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action
action)
   at System.Reactive.Concurrency.ScheduledItem`2.InvokeCore()
   at System.Reactive.Concurrency.ScheduledItem`1.Invoke()
   at System.Reactive.Concurrency.CurrentThreadScheduler.Trampoline.Run()
   at System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState
 state, TimeSpan dueTime, Func`3 action)
   at System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState
 state, Func`3 action)
   at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Actio
n action)
   at System.Reactive.AnonymousObservable`1.Subscribe(IObserver`1 observer)
   at System.Reactive.Linq.Observable.<>c__DisplayClass543`1.<>c__DisplayClass54
5.<Timeout>b__53f()
   at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action
action)
   at System.Reactive.Concurrency.ThreadPoolScheduler.<>c__DisplayClass8`1.<Sche
dule>b__6(Object _)
   at System.Threading._TimerCallback.TimerCallback_Context(Object state)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, C
ontextCallback callback, Object state, Boolean ignoreSyncCtx)
   at System.Threading._TimerCallback.PerformTimerCallback(Object state)
4

2 に答える 2

6

はい、オブザーバーは複数のオブザーバブルをリッスンできます。これの最も良い例は、Merge演算子です。組み込み演算子はすべて RX 文法に従い、多くの場合、RX 文法に従っていないソースに強制します。

あなたIObserverが得るのObserver.Createはそのようなケースの1つです。OnError または OnCompleted が呼び出されると、以降の OnNext への呼び出しは無視されます。これは、同じオブザーバーを使用して 1 つのオブザーバブルをサブスクライブし、最初のオブザーバブルの後に別のオブザーバブルをサブスクライブしても機能しないことを意味します。これは、最初のオブザーバブルからの終了メッセージにより、オブザーバーが 2 番目のオブザーバブルからのメッセージを無視するためです。これを回避するためにMerge、 、Concat、およびOnErrorResumeNext(とりわけ) などの演算子は内部で複数のオブザーバーを使用し、最後のオブザーバブル以外からの完了メッセージ (オペレーターのセマンティクスに応じて OnError および/または OnCompleted) を外側のオブザーバーに渡しません。

取得している例外については言及していませんが、取得している Timeout に起因するエラーだと思いますobservable4。タイムアウトに使用する別のオブザーバブルを提供しない場合、オブザーバーの が呼び出され、エラー ハンドラーを取らないおよびオーバーロードOnErrorのデフォルトOnErrorは、単純に例外をスローすることです。SubscribeObserver.Create

OnNextこれは明らかにサンプル/テスト コードですが、 にメッセージが渡されなくなっても、他のすべてのオブザーバブルはこの例外の後も実行され続けることを指摘したいと思います。Mergeこれを追跡するために 使用するか、説明からすべての使い捨てを追跡し、完了メッセージが表示されたら自分で処分してください。CompositeDisposable(中System.Reactive.Disposables)はこれに適しています。

于 2011-12-13T20:59:26.120 に答える
2

ここでロックを使用するべきではありませんが、本当にこれを機能させたい場合は、次のようにします。

var x = Observable.Create<T>(subj => { /* Fill it in*/ })
    .Multicast(new Subject<T>());

// Set up your subscriptions Here!

// When you call the Connect, whatever is in the Observable.Create will be called
x.Connect();

さらに安全にしたい場合は、サブジェクトの代わりに ReplaySubject を使用して、Create の結果が将来のサブスクリプションのために「リプレイ」されるようにすることができます (一方、サブジェクトでは、Connect ののサブスクライバーは取得されますなし)

于 2011-12-17T01:31:45.843 に答える