サブスクリプションに関するいくつかの観察
(申し訳ありませんが、駄洒落に抵抗できませんでした!)IObservable<out T>
は、ほぼすべての Rx オペレーターによって実装されているインターフェイスであり、重要なメソッドが 1 つだけあります。
IDisposable Subscribe(IObserver<T> observer);
IObserver<T>
オブザーバー ( を実装) がサブスクリプションの開始時と終了時を判断できるのは、純粋にこのメソッドとその戻り値の処理によるものです。
チェーンの一部である Observable にサブスクリプションが作成されると、一般に(直接的または間接的に)、チェーンのさらに上のサブスクリプションになります。これが発生するかどうか、いつ発生するかは、その指定された Observable に依存します。
多くの場合、受信したサブスクリプションと行われたサブスクリプションの関係は 1 対 1 ではありません。この例は Publish() です。これは、受け取るサブスクリプションの数に関係なく、そのソースへのサブスクリプションを最大で 1 つだけ持ちます。これこそがパブリッシュの要点です。
それ以外の場合、関係には一時的な側面があります。たとえば、Concat() は、最初のストリームがサブスクライブするまで、2 番目のストリームをサブスクライブしOnCompleted()
ません。
ここでRx Design Guidelinesを調べてみる価値はあります。
Rx 設計ガイドライン
4.4. Unsubscribe ですべての未処理の作業を停止するための最善の努力を想定します。監視可能なサブスクリプションで unsubscribe が呼び出されると、監視可能なシーケンスはすべての未処理の作業を停止するために最善を尽くします。これは、開始されていないキューに入れられた作業は開始されないことを意味します。
進行中の作業を中止することが常に安全であるとは限らないため、既に進行中の作業は完了する可能性があります。この作業の結果は、以前にサブスクライブしたオブザーバー インスタンスには通知されません。
結論
ここでの意味に注意してください。肝心なのは、アップストリームのサブスクリプションが作成または破棄される可能性がある場合、それは完全に Observable の実装にかかっているということです。言い換えれば、サブスクリプションを破棄することで、Observable が直接的または間接的に行ったサブスクリプションの一部またはすべてを破棄するという保証はまったくありません。これは、オペレーターまたはそのアップストリーム サブスクリプションによって使用されるその他のリソース (スケジュールされたアクションなど) にも当てはまります。
あなたが期待できる最善のことは、すべてのアップストリーム オペレーターの作成者が、すべての未解決の作業を停止するために実際に最善の努力をしたことです。
質問に戻る(ついに!)
Iの内容を確認しSomeMoreRXFunctions
ないと確信が持てませんが、あなたが知っているサブスクリプションを処分したにも関わらず、スケジューラを処分することで、まだ実行中のサブスクリプションのフィート。事実上、あなたはこれを引き起こしています:
void Main()
{
var scheduler = new EventLoopScheduler();
// Decide it's time to stop
scheduler.Dispose();
// The next line will throw an ObjectDisposedException
scheduler.Schedule(() => {});
}
この問題を引き起こす可能性のある完全に合理的な演算子を作成するのは簡単です。スケジューラーを直接使用しない演算子であっても! このことを考慮:
public static class ObservableExtensions
{
public static IObservable<TSource> ReasonableDelay<TSource, TDelay>
(this IObservable<TSource> source, IObservable<TDelay> delay)
{
return Observable.Create<TSource>(observer =>
{
var subscription = new SerialDisposable();
subscription.Disposable = delay
.IgnoreElements()
.Subscribe(_ => {}, () => {
Console.WriteLine("Waiting to subscribe to source");
// Artifical sleep to create a problem
Thread.Sleep(TimeSpan.FromSeconds(2));
Console.WriteLine("Subscribing to source");
// Is this line safe?
subscription.Disposable = source.Subscribe(observer);
});
return subscription;
});
}
}
このオペレーターは、渡された遅延オブザーバブルが完了すると、ソースにサブスクライブします。それがいかに合理的であるかを見てください - それは aSerialDisposable
を使用して、そのオブザーバーへの 2 つの基本的な一時的に分離されたサブスクリプションを単一の使い捨てとして正しく提示します。
ただし、この演算子を覆して例外を発生させるのは簡単です。
void Main()
{
var scheduler = new EventLoopScheduler();
var rx = Observable.Range(0, 10, scheduler)
.ReasonableDelay(Observable.Timer(TimeSpan.FromSeconds(1)));
var subs = rx.Subscribe();
Thread.Sleep(TimeSpan.FromSeconds(2));
subs.Dispose();
scheduler.Dispose();
}
ここで何が起こっているのですか?Range
EventLoopScheduler で を作成していますが、デフォルトのスケジューラを使用して作成された遅延ストリームをアタッチしていますReasonableDelay
。Timer
ここでサブスクライブし、遅延ストリームが完了するまで待ってから、サブスクリプションと EventLoopScheduler を「正しい順序」で破棄します。
私が挿入した人為的な遅延Thread.Sleep
により、自然に発生しやすい競合状態が保証されます。遅延は完了し、サブスクリプションはRange
破棄されましたが、破棄された EventLoopScheduler にオペレーターがアクセスするのを防ぐには遅すぎます。
遅延部分が完了したら、オブザーバーがサブスクライブを解除したかどうかを確認するための合理的な努力を強化することもできます。
// In the ReasonableDelay method
.Subscribe(_ => {}, () => {
if(!subscription.IsDisposed) // Check for unsubscribe
{
Console.WriteLine("Waiting to subscribe to source");
// Artifical sleep to create a problem
Thread.Sleep(TimeSpan.FromSeconds(2));
Console.WriteLine("Subscribing to source");
// Is this line safe?
subscription.Disposable = source.Subscribe(observer);
}
});
役に立ちません。この演算子のコンテキストでロック セマンティクスを純粋に使用する方法もありません。
あなたが間違っていること
その EventLoopScheduler を処分する必要はありません。他の Rx オペレーターにそれを渡すと、その責任を引き継いだことになります。ガイドラインに従ってサブスクリプションをできるだけタイムリーにクリーンアップするのは Rx オペレーター次第です。これは、直接的または間接的に EventLoopScheduler で保留中のスケジュール済みアイテムをキャンセルし、それ以上のスケジューリングを停止して、キューができるだけ早く空になるようにすることを意味します。可能。
上記の例では、複数のスケジューラのやや不自然な使用と ReasonableDelay での強制的なスリープが問題の原因であると考えることができますが、オペレータがすぐにクリーンアップできないという真のシナリオを想像することは難しくありません。
基本的に、Rx スケジューラを破棄することで、Rx と同等のスレッド アボートを実行します。そのシナリオと同様に、例外を処理する必要がある場合があります。
正しいことは、不可解なものを引き離し、SomeMoreRXFunctions()
合理的に可能な限りガイドラインに準拠していることを確認することです.