5

サブスクライブしているオブザーバブルがあります。このオブザーバブルは、複数回設定できる ActivationType というプロパティを持つオブジェクトを返します。

私が達成しようとしているのは、ActivationType が「Type1」に設定されるたびにメッセージをログに記録することです。ただし、ActivationType が「Type2」に設定されている場合、ActivationType が「Type2」の場合は、メッセージを 1 回だけログに記録し、30 秒待ってから再度ログに記録します。

だから私が持っている場合:

myObservable
    .Where(o => o.ActivationType == "Type1" || o.ActivationType == "Type2")  //listen for types 1 and 2
    .Throttle() // ??? somehow only throttle if we are currently looking at Type2
    .Subscribe(Log); //log some stuff

Throttle() は私が探しているものだと思いますが、条件付きでトリガーする方法がわかりません。

助言がありますか?

4

2 に答える 2

6

Windowああ、ほとんど理解不能なオペレーターにとって完璧なケースです!

編集: 私はこのリンクを月に数十回投稿してWindowJoinます。BufferGroupJoin

Lee Campbell: Rx パート 9 – 結合、ウィンドウ、バッファ、およびグループ結合

var source = new Subject<Thing>();

var feed = source.Publish().RefCount();
var ofType1 = feed.Where(t => t.ActivationType == "Type1");
var ofType2 = feed
    // only window the type2s
    .Where(t => t.ActivationType == "Type2")
    // our "end window selector" will be a tick 30s off from start
    .Window(() => Observable.Timer(TimeSpan.FromSeconds(30)))
    // we want the first one in each window...
    .Select(lst => lst.Take(1))
    // moosh them all back together
    .Merge();

    // We want all "type 1s" and the buffered outputs of "type 2s"
    var query = ofType1.Merge(ofType2);

    // Let's set up a fake stream of data
    var running = true;
    var feeder = Task.Factory.StartNew(
       () => { 
         // until we say stop...
         while(running) 
         { 
             // pump new Things into the stream every 500ms
             source.OnNext(new Thing()); 
             Thread.Sleep(500); 
         }
    });

    using(query.Subscribe(Console.WriteLine))
    {               
        // Block until we hit enter so we can see the live output 
        // from the above subscribe 
        Console.ReadLine();
        // Shutdown our fake feeder
        running = false;
        feeder.Wait();
     }
于 2013-03-06T18:28:09.307 に答える
2

2つのストリームを使用しないのはなぜですか?

var baseStream = myObservable.Publish().RefCount(); // evaluate once
var type1 = baseStream.Where(o => o.ActivationType == "Type1");
var type2 = baseStream.Where(o => o.ActivationType == "Type2").Throttle(TimeSpan.FromSeconds(30));

type1.Merge(type2).Subscribe(Log);
于 2013-03-06T17:26:31.417 に答える