2

私はリアクティブ拡張機能を学習しています。最近、この状況に遭遇しました。コードは次のとおりです。

    class Program
{
    private static void Main(string[] args)
    {
        var ls = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9 }.ToObservable();
        ls.Select(m => new
            {
                t = Observable.Start(() =>
                    {
                        Thread.Sleep(100);
                        return new Random().Next(3, 20);
                    }),
                i = m
            }).Subscribe(item => item.t.Subscribe(Console.WriteLine));
        Task.WaitAll();
        Console.WriteLine("all done");
        Console.ReadKey();
    }
}

Observable に IObservable があることを示しており、すべてのプロセスが完了した後に「すべて完了」と出力したいのですが、うまくいきませんでした。プログラムが開始するとすぐに「すべて完了」と表示され、もう待つ必要はありません。私の状況では、 REAL WaitAllを取得するにはどうすればよいですか?

4

2 に答える 2

2

それは実際には Rx の仕組みではありません。Task.WaitAll()と Rx コードの間にリンクはありません。メソッドにタスクを渡すことさえしませんWaitAll();-)

まず、このSubscribeメソッドはノンブロッキングです。この時点で、このシーケンスの消費を開始したいと述べているだけで、値/エラー/完了の通知が送信されたときに何をすべきかを示しています。

ネストされた Observable シーケンスは、すぐに飛び込むにはかなり高度なトピックですが、問題なく作業できます。

class Program
{
    private static void Main(string[] args)
    {
        //Let go, we are not IEnumerable any more :-)
        var ls = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9 }.ToObservable();

        var subscription = ls.Select(_ =>
                Observable.Start(() =>
                    {
                        Thread.Sleep(100);
                        return new Random().Next(3, 20);
                    })
             })
            .Merge() //Merge the IO<IO<T>> into Io<T> so we get a single completion.
            .Subscribe(
                item => item.Subscribe(Console.WriteLine),
                ()=>Console.WriteLine("all done"));


        Console.ReadKey();
        subscription.Dispose();
    }
}

Observable.Start+をや などThread.Sleepの Rx メソッドに置き換えることで、コードをさらに改善できます。Observable.TimerScheduler

ここで重要なことは、Rx が Async であることです。ポイントは、ブロックしないことです。このコードでブロックしているのはThread.SleepとだけConsole.ReadKey()です。理想的には、上記のように、Thread.Sleepとにかく交換します。

于 2013-09-17T21:40:36.327 に答える
2

ここでは、「Rx 以外の」種類のコーディングを少し行っているようです。実行しようとしているタスクは、実際には非常に簡単です。

最初に、何らかの作業を行った後に値を生成するコードがいくつかあります。私はそれを次のように再コーディングしました:

var rnd = new Random();
Func<int> produceValue = () =>
{
    Thread.Sleep(100);
    return rnd.Next(3, 20);
};

これにより、Rx コードとは別の状態に保たれます。new Random()補足として、新しいインスタンスをインスタンス化し続けるのは正しくないため、宣言を関数の外に引き出しましたRandom。そのように乱数を取得するとは限りません。また、一度インスタンス化して、同じインスタンスを使用する必要があります。

したがって、オブザーバブルを生成するコードは簡単です。

var query =
    from n in Observable.Range(1, 9)
    from m in Observable.Start(produceValue)
    select m;

また、購読も簡単です。

query.Subscribe(
    Console.WriteLine,
    () => Console.WriteLine("All Done."));

厄介なコードなしで、あなたがコーディングしようとしていたことを正確に行うと思いWaitForます。

于 2013-09-18T04:05:52.817 に答える