1

Reactive Extensions は、さまざまな演算子を何回評価することになっていますか?

次のテストコードがあります。

var seconds = Observable
    .Interval(TimeSpan.FromSeconds(5))
    .Do(_ => Console.WriteLine("{0} Generated Data", DateTime.Now.ToLongTimeString()));

var split = seconds
    .Do(_ => Console.WriteLine("{0}  Split/Branch received Data", DateTime.Now.ToLongTimeString()));

var merged = seconds
    .Merge(split)
    .Do(_ => Console.WriteLine("{0}   Received Merged data", DateTime.Now.ToLongTimeString()));

var pipeline = merged.Subscribe();

これにより、5 秒ごとに「生成されたデータ」が書き込まれることを期待しています。次に、そのデータを「Split/Branch received Data」を書き込む「split」ストリームと「Received Merged data」を書き込む「merged」ストリームの両方に渡します。最後に、「結合された」ストリームは「分割された」ストリームからも受信しているため、データを 2 回受信し、「結合された受信データ」を 2 回書き込みます。(それらのいくつかを書く順序は特に関係ありません)

しかし、私が得ている出力は次のとおりです。

8:29:56 AM Generated Data
8:29:56 AM Generated Data
8:29:56 AM  Split/Branch received Data
8:29:56 AM   Received Merged data
8:29:56 AM   Received Merged data
8:30:01 AM Generated Data
8:30:01 AM Generated Data
8:30:01 AM  Split/Branch received Data
8:30:01 AM   Received Merged data
8:30:01 AM   Received Merged data

「生成されたデータ」を2回書き込んでいます。私の理解では、「seconds」IObservable にサブスクライブされているダウンストリーム オブザーバーの数は、「Generated Data」が書き込む回数 (ONCE である必要があります) には影響しませんが、影響します。なんで?

: .Net Framework 3.5 環境でリアクティブ拡張機能の安定版リリース v1.0 SP1 を使用しています。

4

4 に答える 4

2

おそらく、彼らはそのアプローチを選択して、各サブスクライバーが最初のサブスクリプションから同じ間隔で値を取得できるようにします。代替インターバルがどのように機能するかを検討してください。

0s - First subscriber subscribes
5s - Value: 0
8s - Second subscriber subscribes
10s - Value: 1
15s - Value: 2
17s - Unsubscribe both

最終的には次のようになります。

First  -----0----1----2-|
Second         --1----2-|

この場合、2 つのオブザーバーは、他のオブザーバーが既にアタッチされているかどうかによって、結果が著しく異なります。実装されているIntervalため、注文や過去の加入者に関係なく、各加入者に同じエクスペリエンスを提供します。

つまり、オブザーバブルを作成するときにInterval追加することで、記述した動作に「変換」できます。.Publish().RefCount()seconds

于 2012-09-07T14:32:14.667 に答える
1

シーケンスがすべてのステップでマルチキャストされていると、場合によってはいいように思えますが、そうすると、Rx が許可するリッチな構成を持つことができなくなります。

別の言い方をすれば、 IObservableのプッシュベースのデュアルですIEnumerableIEnumerableには遅延評価のプロパティがあります。値は、 を移動し始めるまで計算されませんEnumerator。Rx シーケンスは遅延して構成され、最後に Subscribe() (For-Each に相当する Observable) がシーケンスを実現します。

このようにして、最後のステージからサブスクライブを解除するだけですべてのステージでパイプラインを停止できるため、個々のサブスクリプションを管理するという悪夢を経験することなく、ファイア アンド フォーゲットの動作を実行できます。

于 2012-09-07T16:54:14.077 に答える
0

関連するメモとして、遅延評価された列挙可能なシーケンスとの Asti のアナロジーを示す頭の体操があります。

private static Random s_rand = new Random();

public static IEnumerable<int> Rand()
{
    while (true)
        yield return s_rand.Next();
}

public static void Main()
{
    var xs = Rand();

    var res = xs.Zip(xs, (l, r) => l == r).All(b => b);

    Console.WriteLine(res);
}

ランダムなシーケンスをそれ自体で Zip する場合、要素のすべてのペアが同じであると予想しますか (つまり、上記のコードが永久に実行されることになります)? または、何らかの理由でコードが終了して false を出力すると思いますか?

(類似の観察可能なコードの作成は、読者の課題として残されています。)

于 2012-09-08T02:25:24.367 に答える
0

Observablesオブジェクト指向の観点からすると、ストリームを/を定義するインターフェースに基づいて考えるのは普通のことEnumerablesです。Enumerator で定義された Reset という便利なメソッドがあるという事実を無視できる場合、Enumerables は機能的に言​​えますf -> g -> value?。enumerable は本質的に、返される値がなくなるまで呼び出し続ける関数である列挙子を取得するために呼び出す関数です。

同様に、Observable は単純に次f(g) -> g(h) -> h(value?)のように定義されます。これは、値があるときに呼び出したい関数を提供する関数です。

これが、enumerable や observable を特定の方法で定義された一連の関数以外のものとして記述して構成できるようにする意味がない理由です。コントラクトは、計算を構成する機能を保証するためのものです。

それらがライブであるか、キャッシュされているか、レイジーであるかは、実装の詳細であり、他の場所で抽象化される可能性があります。これらの詳細が重要であることに同意することは確かですが、その機能的な性質に焦点を当てることがより重要です。

データベース クエリまたはディレクトリ リストであるシーケンスにはIEnumerable、事前に計算された値のセット (配列など) と同じインターフェイスがあります。その区別をするために、最終的にシーケンスを消費するコード次第です。高階関数を構成する方法であるという概念に慣れることができれば、Rx または Ix を使用して問題をモデル化する方が簡単であることがわかるでしょう。

于 2012-09-10T15:23:19.217 に答える