ThreadPoolScheduler を使用して、Observable BlockingCollection にサブスクライバーを動的に追加および削除しようとしています。
これが私のコードの問題なのか、RX 自体の問題なのかはわかりませんが、この場合、必要に応じて購読/購読解除できるはずだと思いました。
問題を以下に貼り付けたテストに減らしました。
サブスクライバーで Dispose を呼び出してから新しいサブスクライバーを追加するまで、コードは正しく機能します。基本的に、古いスレッドが Observable をデキューしているように見えますが、Observable に対して何もしていません。
これが単体テストです。32 のサブスクライバーをセットアップし、64 のオブジェクトを追加してから、サブスクライブを解除して同じテストを繰り返します。追加のコードを削除しました (そして、登録を解除する前にスレッドが確実に実行されていることを確認するためだけに、いくつかのスリープを追加しました)
最初の 64 個は正しく処理されますが、2 番目のセットでは 32 個のオブジェクトのみがサブスクライバーに渡されます。
[TestClass]
public class RxTests
{
public class ObservTest
{
public BlockingCollection<ObserverTests.UnitTestObservable> mBlockingCollection = new BlockingCollection<ObserverTests.UnitTestObservable>();
public IObservable<ObserverTests.UnitTestObservable> mObservableBlockingCollection;
private static readonly object ObservableLock = new object();
private static volatile ObservTest ObservableInstance;
public static ObservTest Instance
{
get
{
if (ObservableInstance != null)
return ObservableInstance;
lock (ObservableLock)
{
if (ObservableInstance == null)
{
ObservTest observable = new ObservTest();
observable.mObservableBlockingCollection = observable.mBlockingCollection.GetConsumingEnumerable().ToObservable(ThreadPoolScheduler.Instance);
ObservableInstance = observable;
}
return ObservableInstance;
}
}
}
private int count = 0;
public void Release()
{
Interlocked.Increment(ref count);
Console.WriteLine("Release {0} : {1}", count, Thread.CurrentThread.ManagedThreadId);
}
public void LogCount()
{
Console.WriteLine("Total :{0}", count);
}
}
[TestMethod]
public void TestMethod1()
{
IList<IDisposable> subscribers = new List<IDisposable>();
for (int count = 0; count < 32; count++)
{
IDisposable disposable = ObservTest.Instance.mObservableBlockingCollection.Subscribe(Observe);
subscribers.Add(disposable);
}
for (int count = 0; count < 64; count++)
{
ObserverTests.UnitTestObservable observable = new ObserverTests.UnitTestObservable
{
Name = string.Format("{0}", count)
};
ObservTest.Instance.mBlockingCollection.Add(observable);
}
Thread.Sleep(5000);
foreach (IDisposable disposable in subscribers)
{
disposable.Dispose();
}
subscribers.Clear();
for (int count = 0; count < 32; count++)
{
IDisposable disposable = ObservTest.Instance.mObservableBlockingCollection.Subscribe(Observe);
subscribers.Add(disposable);
}
for (int count = 0; count < 64; count++)
{
ObserverTests.UnitTestObservable observable = new ObserverTests.UnitTestObservable
{
Name = string.Format("{0}", count)
};
ObservTest.Instance.mBlockingCollection.Add(observable);
}
Thread.Sleep(3000);
ObservTest.Instance.LogCount();
}
public static void Observe(ObserverTests.UnitTestObservable observable)
{
Console.WriteLine("Observe {0} : {1}", observable.Name, Thread.CurrentThread.ManagedThreadId);
ObservTest.Instance.Release();
}
}
したがって、出力の最終カウントは 96 ですが、128 になると予想されます。
最初のサブスクライバーの数を 32 から減らすと、処理された数が増えます。たとえば、最初のループでカウントを 32 から 16 に減らすと、カウントは 112 になります。カウントを 8 に減らすと、120 になります。
実行されるタスクの数が増えると、それらを処理できるサブスクライバーの数も増えるシステムを目指しています。