1

RX初心者です。

IEnumerable をトラバースし、それぞれのスレッドでデータを処理する複数の DataHandlers に公開したいと思います。

以下は私のサンプルプログラムです。パブリッシュは機能し、新しいスレッドが作成されますが、3 つの RowHandler はすべて 1 つのスレッドで実行されています。私は3つのスレッドが必要です。これを実装する最良の方法は何ですか?

class Program
{

    public class MyDataGenerator
    {
        public IEnumerable<int> myData()
        {
            //Heavy lifting....Don't want to process more than once.
            yield return 1;
            yield return 2;
            yield return 3;
            yield return 4;
            yield return 5;
            yield return 6;
        }
    }

    static void Main(string[] args)
    {
        MyDataGenerator h = new MyDataGenerator();
        Console.WriteLine("Thread id " + Thread.CurrentThread.ManagedThreadId.ToString());
        //
        var shared = h.myData().ToObservable().Publish();
        ///////////////////////////////
        //  Row Handling Requirements
        //
        //      1. Single Scan of IEnumerable. 
        //      2. Row handlers process data in their own threads. 
        //      3. OK if scanning thread blocks while data is processed
        //

        //Create the RowHandlers
        MyRowHandler rn1 = new MyRowHandler();
        rn1.ido = shared.Subscribe(i => rn1.processID(i));
        MyRowHandler rn2 = new MyRowHandler();
        rn2.ido = shared.Subscribe(i => rn2.processID(i));
        MyRowHandler rn3 = new MyRowHandler();
        rn3.ido = shared.Subscribe(i => rn3.processID(i));
        //
        shared.Connect();
    }

    public class MyRowHandler
    {
        public IDisposable ido = null;
        public void processID(int i)
        {
            var o = Observable.Start(() =>
                                        {
                                            Console.WriteLine(String.Format("Start Thread ID {0} Int{1}", Thread.CurrentThread.ManagedThreadId, i));
                                            Thread.Sleep(30);
                                            Console.WriteLine("Done Thread ID"+Thread.CurrentThread.ManagedThreadId.ToString());
                                        }
                                    );
            o.First(); 
        }
    }
}

発見 :

Rx から受け取るコーディング速度とコード品質の向上は、パフォーマンスを犠牲にして行われます。タスク/デリゲートは間違いなく倍速です。つまり、Rx について学ぶ必要がある最も重要なことは、いつ Rx を使用するかということです。以下は要約ガイドラインのドラフトです。大規模なボリュームの場合、チャンク、結合、およびその他の多くのストリームが多数のハンドラー モデルで Rx が使用されていることがわかります。ただし、基本的な非同期では rx を使用しないでください。

マトリックス ガイドラインを含む画像を投稿したいのですが、サイトで画像を投稿できません。

4

1 に答える 1

3

シーケンス要件を正しく理解し、3 つの並行実行スキャンが必要な場合は、TaskPool を観察して、そこからサブスクライブできます。

...

//Create the RowHandlers
MyRowHandler rn1 = new MyRowHandler();
rn1.ido = shared.ObserveOn(Scheduler.TaskPool).Subscribe(i => rn1.processID(i));

...

その後、非同期で実行され、メインスレッドはスキャンが完了するのを待たないため、たとえばプログラムの最後に a を配置しない限り、プログラムはすぐに終了することに注意してくださいConsole.ReadKey()

編集:同じスレッドを「ずっと」実行することに関して、あなたはそのために少し奇妙にスケジュールしています。オブザーバブルを行ハンドラにドロップすると、Scheduler.NewThread を使用して良好な結果を得ることができます。

...

var rowHandler1 = new MyRowHandler();
rowHandler1.ido = shared.ObserveOn(Scheduler.NewThread).Subscribe(rowHandler1.ProcessID);

...

public void ProcessID(int i)
{
    Console.WriteLine(String.Format("Start Thread ID {0} Int{1}", Thread.CurrentThread.ManagedThreadId, i));
    Thread.Sleep(30);
    Console.WriteLine("Done Thread ID" + Thread.CurrentThread.ManagedThreadId.ToString(CultureInfo.InvariantCulture));
}

これにより、各サブスクリプションに独自のスレッドが与えられ、それにとどまります。

于 2012-04-14T06:35:44.613 に答える