1

私が最初に達成したいことを説明しましょう。

イベントストリームから次のデータを受信するとします

var data = new string[] { 
                "hello", 
                "Using", 
                "ok:michael", 
                "ok", 
                "begin:events", 
                "1:232", 
                "2:343", 
                "end:events", 
                "error:dfljsdf",
                "fdl", 
                "error:fjkdjslf",
                "ok"  
            };

データソースをサブスクライブすると、次の結果が得られます

"ok:michael"
"ok"
"begin:events 1:232 2:343 end:events"
"error:dfljsdf"
"error:fjkdjslf"
"ok"

基本的に、 okまたはerrorで始まるデータと、beginとendの間のデータを取得したいと思います。

私はこれまでこれを試しました。

var data = new string[] { 
                "hello", 
                "Using", 
                "ok:michael", 
                "ok", 
                "begin:events", 
                "1:232", 
                "2:343", 
                "end:events", 
                "error:dfljsdf",
                "fdl", 
                "error:fjkdjslf",
                "ok"  
            };



            var dataStream = Observable.Generate(
                                data.GetEnumerator(), 
                                e => e.MoveNext(), 
                                e => e, 
                                e => e.Current.ToString(), 
                                e => TimeSpan.FromSeconds(0.1));         

            var onelineStream = from d in dataStream
                                where d.StartsWith("ok") || d.StartsWith("error")
                                select d;

            // ???
            // may be need to buffer? I want to get data like "begin:events 1:232 2:343 end:events"
            // but it is not working...
            var multiLineStream = from list in dataStream.Buffer<string, string, string>(
                                bufferOpenings: dataStream.Where(d => d.StartsWith("begin")),
                                bufferClosingSelector: b => dataStream.Where(d => d.StartsWith("end")))
                              select String.Join(" ", list);

            // merge two stream????
            // but I have no clue how to merge these twos :(

            mergeStream .Subscribe(d =>
            {
                Console.WriteLine(d);
                Console.WriteLine();
            });

私はリアクティブプログラミングに非常に慣れていないので、リアクティブな方法で考えさせることはできません。:(

前もって感謝します。

4

1 に答える 1

6

あなたは正解にとても近かったです!

基本的に、onelineStream&multiLineStreamクエリはほぼ適切でした。

それらを一緒にマージするのは非常に簡単です。これを行うだけです:

onelineStream.Merge(multiLineStream)

ただし、クエリが不十分だったのは、Observable.Generate値間の遅延を導入するために使用したものでした。これにより、複数のサブスクライバーがある場合に値を「ファンアウト」するオブザーバブルが作成されます。

データと look の定義を考えると、dataStreamこのコードの動作は次のようになります。

dataStream.Select(x => "!" + x).Subscribe(Console.WriteLine);
dataStream.Select(x => "@" + x).Subscribe(Console.WriteLine);

次の値が得られます。

!hello
@Using
!ok:michael
@ok
@1:232
!begin:events
@2:343
!end:events
!fdl
@error:dfljsdf
!error:fjkdjslf
@ok

あるサブスクリプションによって処理されたものと、別のサブスクリプションによって処理されたものがあることに注意してください。これは、onelineStream&multiLineStreamクエリがほぼ正しいとしても、それぞれ一部のデータしか表示されず、期待どおりに動作しないことを意味します。

値をスキップして複製できる競合状態を取得することもできます。そのため、この種の観察可能なものは避けるのが最善です。

値の間に遅延を導入するためのより良いアプローチは、これを行うことです:

var dataStream = data.ToObservable().Do(_ => Thread.Sleep(100));

これにより、「コールド」オブザーバブルが作成されます。つまり、新しいサブスクライバーはすべて、オブザーバブルの新しいサブスクリプションを取得するため、最初の値から開始します。

multiLineStreamクエリは、コールド オブザーバブルでは正しく機能しません。

データ ストリームを "ホット" オブザーバブル (サブスクライバー間で値を共有する) にするために、Publish演算子を使用します。

したがって、multiLineStream次のようになります。

var multiLineStream =
    dataStream.Publish(ds =>
        from list in ds.Buffer(
            ds.Where(d => d.StartsWith("begin")),
            b => ds.Where(d => d.StartsWith("end")))
        select String.Join(" ", list));

その後、次のように結果を取得できます。

onelineStream.Merge(multiLineStream).Subscribe(d =>
{
    Console.WriteLine(d);
    Console.WriteLine();
});

これは私が得たものです:

ok:michael
ok
begin:events 1:232 2:343 end:events
error:dfljsdf
error:fjkdjslf
ok

それがうまくいくかどうか教えてください。

于 2012-05-12T01:01:01.753 に答える