4

こんにちは' 101 Rx の例の 1 つを試してみました。

    static IEnumerable<int> GenerateAlternatingFastAndSlowEvents()
    {
        int i = 0;

        while (true)
        {
            if (i > 1000)
            {
                yield break;
            }
            yield return i;
            Thread.Sleep(i++ % 10 < 5 ? 500 : 1000);
        }
    }

    private static void Main()
    {
        var observable = GenerateAlternatingFastAndSlowEvents().ToObservable().Timestamp();
        var throttled = observable.Throttle(TimeSpan.FromMilliseconds(750));

        using (throttled.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
        {
            Console.WriteLine("Press any key to unsubscribe");
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }

「購読を解除するには何かキーを押してください」という行が表示されない理由がわかりません。私の理解では、サブスクライブは非同期であり、サブスクライブするとすぐに返されます。メインスレッドがブロックされる原因は何ですか?

4

2 に答える 2

7

ブロッキングは、列挙可能なループオンwhile (true)とデフォルトのIEnumerable<T>.ToObservable()拡張メソッドの組み合わせによって発生しますCurrentThreadScheduler

のオーバーロードを提供する場合Scheduler.TaskPool(またはScheduler.ThreadPool.NET 4より前の場合)、ToObservable期待する動作が表示されるはずです(ただし、メインスレッドでサブスクライバーを呼び出すことはありません、FYI)。

そうは言っても、あなたはあなたの組み合わせを見つけて、あなたが期待するように働くとThread.Sleep思います。Throttleスケジューラーを使用して遅延をスケジュールするカスタムオブザーバブルを作成する方がおそらく良いでしょう。

于 2011-02-25T10:35:47.967 に答える
2

私はリチャードに同意します。

の実装は.ToObservable()次のようになります。

public static IObservable<TSource> ToObservable<TSource>(
    this IEnumerable<TSource> source)
{
    if (source == null)
    {
        throw new ArgumentNullException("source");
    }
    return source.ToObservable<TSource>(Scheduler.CurrentThread);
}

and を使用して.ToObservable(IScheduler)オーバーロードを呼び出しているため、コードがメソッドを超える前にオブザーバブルが完了しなければならない遅延を引き起こしています。すべてが単一のスレッドで実行された場合、このコードがどのように動作するかを考えてみてください (実際はそうです)。Scheduler.CurrentThread.Sleep(...).Subscribe(...)

これを回避するには、リチャードが提案するようにタスク プールまたはスレッド プール スケジューラを使用できますが、コードにはもっと根本的な問題があると思います。つまり、「古い学校」のスレッド スリープを使用しており、代わりに Rx メソッドに依存していないということです。

これを試して、オブザーバブルを生成してください:

var observable =
    Observable
        .GenerateWithTime(0, i => i <= 1000, i => i + 1,
            i => i, i => TimeSpan.FromMilliseconds(i % 10 < 5 ? 500 : 1000))
        .Timestamp();

GenerateWithTime(...)メソッドが行ったすべてのことを行いますGenerateAlternatingFastAndSlowEventsが、オブザーバブルを直接作成しScheduler.ThreadPool、ボンネットの下を使用して行うため、スケジューラを指定する必要はありません。

于 2011-02-25T10:48:40.743 に答える