5

最近、システムを RX 1.11111 から RX 2.0 に移植したところ、この問題が発見されました。次のように ObserveOn に EventLoopScheduler を使用します。

IDisposable subscription = someSubject
    .ObserveOn(m_eventLoopScheduler)
    .SomeMoreRXFunctions()
    .Subscribe((something)=>something)

スケジューラは、アプリケーションの終了時に破棄されます ( m_eventLoopScheduler.Dispose)。その前に、オブザーバブル ( subscription.Dispose) へのすべてのサブスクリプションを破棄します。

それにもかかわらずObjectDisposedExceptionEventLoopScheduler.Schedule. RX スレッドで発生したため、その例外をキャッチすることはできません。Dispose がキュー内のすべてのアイテムを削除しないようです。

への呼び出しを削除しようとしたところEventLoopScheduler.Dispose、例外が消えました。SomeMoreRXFunctions()しかし、すべてのサブスクリプションが破棄されたにもかかわらず、コードはさらに約 10 回実行されました。

を適切に閉じる他の方法はありEventLoopSchedulerますか?

4

4 に答える 4

9

サブスクリプションに関するいくつかの観察

(申し訳ありませんが、駄洒落に抵抗できませんでした!)IObservable<out T>は、ほぼすべての Rx オペレーターによって実装されているインターフェイスであり、重要なメソッドが 1 つだけあります。

IDisposable Subscribe(IObserver<T> observer);

IObserver<T>オブザーバー ( を実装) がサブスクリプションの開始時と終了時を判断できるのは、純粋にこのメソッドとその戻り値の処理によるものです。

チェーンの一部である Observable にサブスクリプションが作成されると、一般に(直接的または間接的に)、チェーンのさらに上のサブスクリプションになります。これが発生するかどうか、いつ発生するかは、その指定された Observable に依存します。

多くの場合、受信したサブスクリプションと行われたサブスクリプションの関係は 1 対 1 ではありません。この例は Publish() です。これは、受け取るサブスクリプションの数に関係なく、そのソースへのサブスクリプションを最大で 1 つだけ持ちます。これこそがパブリッシュの要点です。

それ以外の場合、関係には一時的な側面があります。たとえば、Concat() は、最初のストリームがサブスクライブするまで、2 番目のストリームをサブスクライブしOnCompleted()ません。

ここでRx Design Guidelinesを調べてみる価値はあります。

Rx 設計ガイドライン

4.4. Unsubscribe ですべての未処理の作業を停止するための最善の努力を想定します。監視可能なサブスクリプションで unsubscribe が呼び出されると、監視可能なシーケンスはすべての未処理の作業を停止するために最善を尽くします。これは、開始されていないキューに入れられた作業は開始されないことを意味します。

進行中の作業を中止することが常に安全であるとは限らないため、既に進行中の作業は完了する可能性があります。この作業の結果は、以前にサブスクライブしたオブザーバー インスタンスには通知されません。

結論

ここでの意味に注意してください。肝心なのは、アップストリームのサブスクリプションが作成または破棄される可能性がある場合、それは完全に Observable の実装にかかっているということです。言い換えれば、サブスクリプションを破棄することで、Observable が直接的または間接的に行ったサブスクリプションの一部またはすべてを破棄するという保証はまったくありません。これは、オペレーターまたはそのアップストリーム サブスクリプションによって使用されるその他のリソース (スケジュールされたアクションなど) にも当てはまります。

あなたが期待できる最善のことは、すべてのアップストリーム オペレーターの作成者が、すべての未解決の作業を停止するために実際に最善の努力をしたことです。

質問に戻る(ついに!)

Iの内容を確認しSomeMoreRXFunctionsないと確信が持てませんが、あなたが知っているサブスクリプションを処分したにも関わらず、スケジューラを処分することで、まだ実行中のサブスクリプションのフィート。事実上、あなたはこれを引き起こしています:

void Main()
{
    var scheduler = new EventLoopScheduler();

    // Decide it's time to stop
    scheduler.Dispose();

    // The next line will throw an ObjectDisposedException
    scheduler.Schedule(() => {});
}

この問題を引き起こす可能性のある完全に合理的な演算子を作成するのは簡単です。スケジューラーを直接使用しない演算子であっても! このことを考慮:

public static class ObservableExtensions
{
    public static IObservable<TSource> ReasonableDelay<TSource, TDelay>
        (this IObservable<TSource> source, IObservable<TDelay> delay)
    {
        return Observable.Create<TSource>(observer =>
        {        
            var subscription = new SerialDisposable();
            subscription.Disposable = delay
                .IgnoreElements()
                .Subscribe(_ => {}, () => {
                    Console.WriteLine("Waiting to subscribe to source");
                    // Artifical sleep to create a problem
                    Thread.Sleep(TimeSpan.FromSeconds(2));
                    Console.WriteLine("Subscribing to source");
                    // Is this line safe?
                    subscription.Disposable = source.Subscribe(observer);
                }); 
            return subscription;
        });
    }    
}

このオペレーターは、渡された遅延オブザーバブルが完了すると、ソースにサブスクライブします。それがいかに合理的であるかを見てください - それは aSerialDisposableを使用して、そのオブザーバーへの 2 つの基本的な一時的に分離されたサブスクリプションを単一の使い捨てとして正しく提示します。

ただし、この演算子を覆して例外を発生させるのは簡単です。

void Main()
{
    var scheduler = new EventLoopScheduler();
    var rx = Observable.Range(0, 10, scheduler)
                       .ReasonableDelay(Observable.Timer(TimeSpan.FromSeconds(1)));
    var subs = rx.Subscribe();

    Thread.Sleep(TimeSpan.FromSeconds(2));
    subs.Dispose();
    scheduler.Dispose();    
}

ここで何が起こっているのですか?RangeEventLoopScheduler で を作成していますが、デフォルトのスケジューラを使用して作成された遅延ストリームをアタッチしていますReasonableDelayTimer

ここでサブスクライブし、遅延ストリームが完了するまで待ってから、サブスクリプションと EventLoopScheduler を「正しい順序」で破棄します。

私が挿入した人為的な遅延Thread.Sleepにより、自然に発生しやすい競合状態が保証されます。遅延は完了し、サブスクリプションはRange破棄されましたが、破棄された EventLoopScheduler にオペレーターがアクセスするのを防ぐには遅すぎます。

遅延部分が完了したら、オブザーバーがサブスクライブを解除したかどうかを確認するための合理的な努力を強化することもできます。

// In the ReasonableDelay method
.Subscribe(_ => {}, () => {        
    if(!subscription.IsDisposed) // Check for unsubscribe
    {
        Console.WriteLine("Waiting to subscribe to source");
        // Artifical sleep to create a problem            
        Thread.Sleep(TimeSpan.FromSeconds(2));
        Console.WriteLine("Subscribing to source");
        // Is this line safe?                    
        subscription.Disposable = source.Subscribe(observer);
    }
}); 

役に立ちません。この演算子のコンテキストでロック セマンティクスを純粋に使用する方法もありません。

あなたが間違っていること

その EventLoopScheduler を処分する必要はありません。他の Rx オペレーターにそれを渡すと、その責任を引き継いだことになります。ガイドラインに従ってサブスクリプションをできるだけタイムリーにクリーンアップするのは Rx オペレーター次第です。これは、直接的または間接的に EventLoopScheduler で保留中のスケジュール済みアイテムをキャンセルし、それ以上のスケジューリングを停止して、キューができるだけ早く空になるようにすることを意味します。可能。

上記の例では、複数のスケジューラのやや不自然な使用と ReasonableDelay での強制的なスリープが問題の原因であると考えることができますが、オペレータがすぐにクリーンアップできないという真のシナリオを想像することは難しくありません。

基本的に、Rx スケジューラを破棄することで、Rx と同等のスレッド アボートを実行します。そのシナリオと同様に、例外を処理する必要がある場合があります。

正しいことは、不可解なものを引き離し、SomeMoreRXFunctions()合理的に可能な限りガイドラインに準拠していることを確認することです.

于 2013-10-31T22:15:10.423 に答える
2

この質問へのリンクとして、この質問に気付きました: Reactive Rx 2.0 EventLoopScheduler ObjectDisposedException after dispose

ここで行ったことをここに再投稿します-スケジューラを「フラッシュ」する方法はわかりませんが、避けられない「オブジェクトが破棄された」例外を次のようにラップ/処理できます。

EventLoopScheduler scheduler = new EventLoopScheduler();
var wrappedScheduler = scheduler.Catch<Exception>((ex) => 
{
    Console.WriteLine("Got an exception:" + ex.ToString());
    return true;
});

for (int i = 0; i < 100; ++i)
{
    var handle = Observable.Interval(TimeSpan.FromMilliseconds(1))
                           .ObserveOn(wrappedScheduler)
                           .Subscribe(Observer.Create<long>((x) => Thread.Sleep(1000)));

    handles.Add(handle);
}
于 2013-03-28T17:25:52.080 に答える
2

私たちは同じ問題に遭遇し、例外なく EventLoopScheduler を破棄するために次のことを行うことになりました。

scheduler.Schedule(() => scheduler.Dispose());

これを実行する前にすべてのサブスクリプションを適切に破棄すると (実行したと言う)、Dipose() 呼び出しは最後にスケジュールされた操作であり、他のすべての保留中の操作は Dispose が呼び出される前に完了することができます。

これをより堅牢/再利用可能にするために、すべての操作を委任し、上に示すように Dispose を実装する EventLoopScheduler をラップする独自の IScheduler 実装を作成できます。その上、Schedule メソッドにガードを実装して、Dispose が呼び出された後にアクションをスケジュールしないようにすることができます (たとえば、一部のオブザーバーのサブスクライブを解除するのを忘れた場合など)。

于 2015-08-13T12:33:40.453 に答える
0

部分的に解決しました。ケースは、ここに示すよりも複雑でした。チェーンは次のようになりました。

var published = someSubject.ObserveOn(m_eventLoopScheduler).SomeMoreRXFunctions().Publish();

IDisposabledisposable1 = published.Connect();

IDisposabledisposable2 = published.Subscribe((何か)=>何か);

使い捨て1と使い捨て2の両方を破棄すると、 SomeMoreRXFunctions() のコードは実行されなくなりました。一方、スケジューラ自体を破棄しようとすると、同じ例外がスローされます。

残念ながら、より単純なコードで問題を再現することはできません。それはおそらく、私が見逃している何かがあることを示しています。

これは私たちが受け入れることができる解決策ですが、例外の可能性なしにスケジューラを一度に閉じるより良いものを見つけたいと思っています.

于 2012-10-30T14:27:49.433 に答える