0

私の目標は、更新ハンドラーが "Type2" オブジェクトに対して 5 秒に 1 回しか呼び出されないようにすることです。オブザーバブルは 5 秒ごとに複数の値を生成しますが、最後に処理された更新から 5 秒以内に発生する値はすべて無視したいと考えています。

ここでこの質問をしました: 特定の条件が満たされた場合にのみスロットル

と、良い反響を得ました。Observable.Window を使用して目標を達成しようとしました。私はそれが機能していると思っていましたが、ウィンドウが閉じる直前に最初の更新が行われ(更新が処理される)、次のウィンドウが開くと別の更新が到着して処理されると、誤った出力が生成される可能性があることが判明しました。最後に処理された更新から 5 秒以内に届いたので、そうしたくありません。

リンクのコードからわずかに変更された、問題を示すコードを次に示します。

var source = new Subject<Thing>();    
var feed = source.Publish().RefCount();

var ofType1 = feed.Where(t => t.ActivationType == "Type1");
var ofType2 = feed
    .Where(t => t.ActivationType == "Type2")
    .Window(() =>
            Observable.Timer(TimeSpan.FromSeconds(5))
            .Do(t => Console.WriteLine("\nTICK: " + DateTime.Now.ToString("hh:mm:ss:fff"))))
    .Select(x => x.Take(1))
    .Merge()
    .Do(t => Console.WriteLine("A new window opened " + DateTime.Now.ToString("hh:mm:ss:fff")));


var query = ofType1.Merge(ofType2);        
query.Subscribe(t => Console.WriteLine("UPDATE: " + t.ID + " " + DateTime.Now.ToString("hh:mm:ss:fff")));    

int msDelay = 3000;
Task task = Task.Factory
    .StartNew(() => { Thread.Sleep(msDelay); })
    .ContinueWith((Task starter) =>
        {
            while (running)
            {
                var thing = new Thing();  //Note that all Things are by default Type2
                source.OnNext(thing);
                Thread.Sleep(100);
            }
        }, TaskContinuationOptions.LongRunning);

Console.ReadLine();

したがって、サブスクリプションが作成され、サブスクリプションが作成されると、ウィンドウで使用される Observable.Timer が開始されます。値を生成するために使用される while ループは、3000 ミリ秒の遅延の後まで開始されません。

出力は次のようになります。

A new window opened 03:48:03:725
UPDATE: 1ac54fb3-f73d-4840-b4d8-95d4250ce65d 03:48:03:752

TICK: 03:48:05:714
A new window opened 03:48:05:754
UPDATE: 12d36e53-010f-4ccd-b9f8-2951b085f88c 03:48:05:754

TICK: 03:48:10:730
A new window opened 03:48:10:755
UPDATE: 25d84e72-94f9-4f50-83f4-14c1004c10fa 03:48:10:755

TICK: 03:48:15:738
A new window opened 03:48:15:755
UPDATE: 5f32b7d5-196f-445c-bf25-5c362b2fd6f0 03:48:15:755

TICK: 03:48:20:747
A new window opened 03:48:20:756
UPDATE: e3a3a30d-8031-41b5-b115-499dbe91aaf7 03:48:20:756

TICK: 03:48:25:755
A new window opened 03:48:25:756
UPDATE: 239fb25b-5135-463b-bf7e-5728ffa07f5c 03:48:25:756

ご覧のとおり、最初の Type2 更新はウィンドウが開いている間に行われるため、処理されます。次に、2 秒後、ウィンドウのタイマーが作動し、新しいウィンドウが開きます。それはすぐに次の Type2 更新を処理しますが、これは望んでいません。その後、正常に動作しているように見えます (Window 宣言で定義されているように、5 秒ごとに更新されます)。

5 秒 (または選択した時間枠) ごとに 1 つの更新のみが処理されるようにするために使用できる方法または別の方法はありますか?

4

1 に答える 1

1

解決策はあると思いますが、最初にいくつか提案をさせてください。質問には多くのノイズがあり、本当の質問が何であるかを見つけるのが難しいと思います.

事実上、「少なくとも 5 秒間無音の後に値を取得するにはどうすればよいですか」と尋ねています。Type1コードは気を散らすものです。シーケンスの生成も気を散らすものです。

それでは、サンプル コードをクリーンアップして、木の代わりに木が見えるかどうかを確認してみましょう。

まず、通過するタイプが何であるかが完全に関連しているとは思いません。あなたの例では、 をプッシュすることはないType1ので、代わりに整数を使用しましょう。それはそれをより簡単にするかもしれません。

次に、大きな loop+task+thread.sleep の代わりに Observable.Timer を使用するだけで、作成をクリーンアップできます。

これで、簡単な出発点ができました。

var source = Observable.Timer(TimeSpan.FromSeconds(3),TimeSpan.FromMilliseconds(300), Scheduler.TaskPool);   
var feed = source.Publish().RefCount();

したがって、最初の問題は、使用している Window オーバーロードの誤解です。最初の値が押されたときにウィンドウが開くことを期待していると思います。そうではありません。タイマー (つまり、Observable.Timer(TimeSpan.FromSeconds(5))) は、ウィンドウが開くたびにサブスクライブされます。これは、最初にサブスクリプションが発生したときであり、ウィンドウ自体が閉じるときに再びサブスクライブされます。したがって、タイマーはすぐに開始され、最初のウィンドウでは 2 秒の値しか取得できません。

次に、問題空間を描きます。これを行う私のお気に入りの方法は、マーブル ダイアグラムを使用することです。ASCII ではあまりうまく変換されませんが、いずれかの方法で試してみましょう。

この入力シーケンスが与えられた場合:

//Seconds             1111111111222222222233333333334444444444555555555566666666667777777777888888888899999999990000000000
//Tenths    01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890
//
//source  : ------------------------------0--1--2--3--4--5--6--7--8--9--0--1--2--3--4--5--6--7--8--9--0--1--2--3--4--5--6--7--8--9--0

これは、3.0 秒で生成される値「0」を表すと想定されています。3.3 秒などで「1」が続きます。

問題空間が少し明確になったので、ウィンドウが開くべき場所、閉じるべき場所、次のウィンドウが開くべき場所を描くことができます。

代わりに、何が欲しいか見てみましょう。

ここでは、最初の値 ('1' @3.0s) が押されたときに開く Window1 (W1) を追加します。5秒後に閉じます。このウィンドウでは、最初の値 1 の後に沈黙が必要です。

ウィンドウ 2 (W2) は、最後のウィンドウが閉じた直後ではなく、次の値が生成されると開く必要があります (私は思う?!)。ここでは、値「17」が時間 8.4 秒でプッシュされると、これが Opened であることがわかります。

//Seconds             1111111111222222222233333333334444444444555555555566666666667777777777888888888899999999990000000000
//Tenths    01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890

//source  : ------------------------------0--1--2--3--4--5--6--7--8--9--1--1--1--1--1--1--1--1--1--1--2--2--2--2--2--2--2--2--2--2--3--3--3--3--3--3--3--3--3--3--4--4--4--4-
                                                                        0  1  2  3  4  5  6  7  8  9  0  1  2  3  4  5  6  7  8  9  0  1  2  3  4  5  6  7  8  9  0  1  2  3
//
//W1      :                               0-------------------------------------------------|
//W2      :                                                                                  (17)----------------------------------------------|
//W3      :                                                                                                                                     (34)------------------------>

//expected: ------------------------------0--------------------------------------------------1--------------------------------------------------3---------------------------
                                                                                             7                                                  4

探している値が何であるかがわかったので、クエリを作成できます。

私はこれを思いつきました。フィードは実際にはホット シーケンスであると仮定します。この仮定を使用して、フィードから 1 つの値を取得し、5 秒間の無音をシーケンスに連結する繰り返し構造を構築します。次に、Repeat オペレーターを追加します。これは、5 秒間の沈黙が経過した後にのみフィードを再サブスクライブします。

public static IObservable<T> Silencer<T>(this IObservable<T> source, TimeSpan minSilencePeriod)
{
    return source.Take(1)
                 .Concat(Observable.Empty<T>().Delay(minSilencePeriod))
                 .Repeat();
}

これにより、期待どおりに値0、17、51などが生成されます。

これを元の質問のコードに適用します(いくつかのものをクリーンアップします)

void Main()
{
    var source = Observable.Timer(TimeSpan.FromSeconds(3),TimeSpan.FromMilliseconds(300), Scheduler.TaskPool).Select(_=>new Thing());   
    var feed = source.Publish().RefCount();

    var ofType1 = feed.Where(t => t.ActivationType == "Type1");
    var ofType2 = feed
            .Where(t => t.ActivationType == "Type2")
            .Silencer(TimeSpan.FromSeconds(5));


    var query = ofType1.Merge(ofType2);        
    var subscription = query.Subscribe(t => Console.WriteLine("UPDATE: " + t.ID + " " + DateTime.Now.ToString("hh:mm:ss:fff")));    


    Console.ReadLine();
    subscription.Dispose();
}

少なくとも 5 秒間隔の値を持つ出力が表示されます

UPDATE: 3f0fc6f3-8a5a-476f-9661-b7330ab77877 09:14:04:725
UPDATE: fc8f0025-7a79-4329-8164-b8b421ad5865 09:14:09:817
UPDATE: ad739a71-885e-4d5b-a352-2302df0a4d87 09:14:14:925
于 2013-03-26T09:23:08.040 に答える