C# でコンシューマーを実装しようとしています。同時に実行できるパブリッシャーは多数あります。3 つの例を作成しました。1 つは Rx と subject を使用し、もう 1 つは BlockingCollection を使用し、もう 1 つは BlockingCollection の ToObservable を使用しています。この単純な例では、それらはすべて同じことを行っており、複数のプロデューサーで動作するようにしたいと考えています。
それぞれのアプローチの違いは何ですか?
私はすでに Rx を使用しているので、このアプローチをお勧めします。しかし、OnNext にはスレッド セーフの保証がなく、Subject と既定のスケジューラのキューイング セマンティクスがどのようなものかわかりません。
スレッドセーフな件名はありますか?
すべてのメッセージが処理されますか?
これが機能しない他のシナリオはありますか? 同時に処理されていますか?
void SubjectOnDefaultScheduler()
{
var observable = new Subject<long>();
observable.
ObserveOn(Scheduler.Default).
Subscribe(i => { DoWork(i); });
observable.OnNext(1);
observable.OnNext(2);
observable.OnNext(3);
}
Rx ではありませんが、簡単に使用/サブスクライブできます。アイテムを受け取り、それを処理します。これは連続して発生するはずです。
void BlockingCollectionAndConsumingTask()
{
var blockingCollection = new BlockingCollection<long>();
var taskFactory = new TaskFactory();
taskFactory.StartNew(() =>
{
foreach (var i in blockingCollection.GetConsumingEnumerable())
{
DoWork(i);
}
});
blockingCollection.Add(1);
blockingCollection.Add(2);
blockingCollection.Add(3);
}
サブジェクトに少し似たブロッキング コレクションを使用することは、良い妥協点のように思えます。async/await を使用できるように、暗黙的にタスクにスケジュールを設定すると思いますが、それは正しいですか?
void BlockingCollectionToObservable()
{
var blockingCollection = new BlockingCollection<long>();
blockingCollection.
GetConsumingEnumerable().
ToObservable(Scheduler.Default).
Subscribe(i => { DoWork(i); });
blockingCollection.Add(1);
blockingCollection.Add(2);
blockingCollection.Add(3);
}