3

C# でコンシューマーを実装しようとしています。同時に実行できるパブリッシャーは多数あります。3 つの例を作成しました。1 つは Rx と subject を使用し、もう 1 つは BlockingCollection を使用し、もう 1 つは BlockingCollection の ToObservable を使用しています。この単純な例では、それらはすべて同じことを行っており、複数のプロデューサーで動作するようにしたいと考えています。

それぞれのアプローチの違いは何ですか?

私はすでに Rx を使用しているので、このアプローチをお勧めします。しかし、OnNext にはスレッド セーフの保証がなく、Subject と既定のスケジューラのキューイング セマンティクスがどのようなものかわかりません。

スレッドセーフな件名はありますか?

すべてのメッセージが処理されますか?

これが機能しない他のシナリオはありますか? 同時に処理されていますか?

void SubjectOnDefaultScheduler()
{
    var observable = new Subject<long>();
    observable.
        ObserveOn(Scheduler.Default).
        Subscribe(i => { DoWork(i); });
    observable.OnNext(1);
    observable.OnNext(2);
    observable.OnNext(3);
}

Rx ではありませんが、簡単に使用/サブスクライブできます。アイテムを受け取り、それを処理します。これは連続して発生するはずです。

void BlockingCollectionAndConsumingTask()
{
    var blockingCollection = new BlockingCollection<long>();
    var taskFactory = new TaskFactory();
    taskFactory.StartNew(() =>
    {
        foreach (var i in blockingCollection.GetConsumingEnumerable())
        {
            DoWork(i);
        }
    });
    blockingCollection.Add(1);
    blockingCollection.Add(2);
    blockingCollection.Add(3);
}

サブジェクトに少し似たブロッキング コレクションを使用することは、良い妥協点のように思えます。async/await を使用できるように、暗黙的にタスクにスケジュールを設定すると思いますが、それは正しいですか?

void BlockingCollectionToObservable()
{
    var blockingCollection = new BlockingCollection<long>();
    blockingCollection.
        GetConsumingEnumerable().
        ToObservable(Scheduler.Default).
        Subscribe(i => { DoWork(i); });
    blockingCollection.Add(1);
    blockingCollection.Add(2);
    blockingCollection.Add(3);
}
4

1 に答える 1

7

サブジェクトはスレッドセーフではありません。同時に発行された OnNexts は Observer を同時に直接呼び出します。個人的には、Rx の他の領域が正しいセマンティクスを強制する範囲を考えると、これは非常に驚くべきことだと思います。これは、パフォーマンスを考慮して行われたとしか思えません。

ただし、サブジェクトは、OnError または OnComplete で終了を強制するという点で、中途半端な家のようなものです。これらのいずれかが発生した後、OnNext は NOP になります。そして、この動作スレッドセーフです。

ただし、サブジェクトで Observable.Synchronize() を使用すると、発信呼び出しが適切な Rx セマンティクスに従うように強制されます。特に、OnNext 呼び出しが同時に行われるとブロックされます。

基礎となるメカニズムは、標準の .NET ロックです。ロックが複数のスレッドによって競合される場合、ほとんどの場合、先着順でロックが付与されます。公平性が損なわれる特定の条件があります。ただし、探しているシリアル化されたアクセスは確実に得られます。

ObserveOn にはプラットフォーム固有の動作があります。利用可能な場合は、a を指定できSynchronizationContext、OnNext 呼び出しがそれに Post されます。スケジューラを使用すると、呼び出しを に配置しConcurrentQueue<T>、スケジューラを介してシリアルにディスパッチすることになるため、実行のスレッドはスケジューラに依存します。いずれにせよ、キューイング動作は正しいセマンティクスも強制します。

どちらの場合 (Synchronize と ObserveOn) でも、メッセージが失われることはありません。ObserveOn を使用すると、Scheduler/Context の選択によって、メッセージを処理するスレッドを暗黙的に選択できます。Synchronize を使用すると、呼び出し元のスレッドでメッセージを処理します。どちらが良いかは、シナリオによって異なります。

生産者が消費者を追い越した場合に何をしたいかなど、考慮すべきことは他にもあります。

Rxx Consume もご覧になることをお勧めします: http://rxx.codeplex.com/SourceControl/changeset/view/63470#1100703

Synchronize の動作を示すサンプル コード (Nuget Rx-Testing、Nunit) - Thread.Sleep コードでは少しおかしくなっていますが、かなり手間がかかり、私は怠け者でした :):

public class SubjectTests
{
    [Test]
    public void SubjectDoesNotRespectGrammar()
    {
        var subject = new Subject<int>();
        var spy = new ObserverSpy(Scheduler.Default);
        var sut = subject.Subscribe(spy);
        // Swap the following with the preceding to make this test pass
        //var sut = subject.Synchronize().Subscribe(spy);

        Task.Factory.StartNew(() => subject.OnNext(1));
        Task.Factory.StartNew(() => subject.OnNext(2));

        Thread.Sleep(2000);

        Assert.IsFalse(spy.ConcurrencyViolation);
    }

    private class ObserverSpy : IObserver<int>
    {
        private int _inOnNext;

        public ObserverSpy(IScheduler scheduler)
        {
            _scheduler = scheduler;
        }

        public bool ConcurrencyViolation = false;
        private readonly IScheduler _scheduler;

        public void OnNext(int value)
        {
            var isInOnNext = Interlocked.CompareExchange(ref _inOnNext, 1, 0);

            if (isInOnNext == 1)
            {
                ConcurrencyViolation = true;
                return;
            }

            var wait = new ManualResetEvent(false);

            _scheduler.Schedule(TimeSpan.FromSeconds(1), () => wait.Set());
            wait.WaitOne();

            _inOnNext = 0;
        }

        public void OnError(Exception error)
        {

        }

        public void OnCompleted()
        {

        }
    }
}
于 2013-05-12T21:23:45.613 に答える