7

いわばRx酒宴に入りました。この質問は私のここここに関連しています。それにもかかわらず、これらは同じテーマの有用なバリエーションと見なすことができたので、誰かの助けになるかもしれません.

質問:オブジェクトのランダムなストリームint(たとえば、ランダムな間隔で生成された [0, 10] の間隔で) のオブジェクトをグループにグループ化し、earth グループに可変数のイベント不在アラームを提供するにはどうすればよいでしょうか (より適切な定義がないため、さらなる背景については、リンクされた投稿を参照してください)。より具体的にコードを使用すると、次のようにグループごとに複数のスロットル設定を定義する方法は次のとおりです。

var idAlarmStream = idStream
.Select(i => i)
.GroupByUntil(key => key.Id, grp => grp.Throttle(Timespan.FromMilliseconds(1000))
.SelectMany(grp => grp.TakeLast(1))
.Subscribe(i => Console.WriteLine(i));

ここでは、グループごとに 1 秒以上 ID がない場合に、サブスクライブ関数が呼び出されます。イベントがない場合に 3 つの異なる値 (たとえば、1 秒、5 秒、10 秒) を定義し、イベントが到着するとすべてキャンセルされるとしたらどうでしょうか。私が考えることができるのは次のとおりです。

  • 各 IDidStreamをいくつかの合成 ID に分割し、実際の ID と合成 ID の間の全単射マッピングを提供します。たとえば、この場合の ID: 1 -> 100, 101, 102; ID: 2 -> 200, 201, 203 のThrottleようにセレクター関数を定義し、呼び出されFunc<int, Timespan>(i => /* switch(i)...*/)たときSubscribeに ID をマップし直します。詳細な背景については、リンクされた質問も参照してください。
  • ID がグループ化されたネストされたグループを作成し、さらに ID グループがスロットリング値に従ってグループにコピー/レプリケート/フォーク (適切な用語はわかりません) されます。このアプローチはかなり複雑で、最適な方法かどうかはわかりません。それでも、そのようなクエリを見てみたいと思います。

より一般的な設定では、これに関連するものを見つけることができませんでしたが、これはあるグループごとに複数のハンドラーが存在する状況であると思われます。

<編集: (できれば明確にするため) 例として、idStream1 つの ID: 1 をプッシュします。これにより、3 つの異なるカウンターが開始され、それぞれが次のイベントの発生を待機するか、新しい ID 1 が時間内に検出されない場合に警告します。カウンター 1 (C1) は 5 秒間、カウンター 2 (C2) は 7 秒間、カウンター 3 (C3) は 10 秒間待機します。新しい ID 1 が間隔 [0, 5] 秒以内に受信されると、すべてのカウンターが前述の値で再初期化され、アラームは送信されません。新しい ID が間隔 [0, 7) 秒以内に受信されると、C1 アラームと C2 および C3 が再初期化されます。同様に、新しい ID が間隔 [0, 10) 秒以内に受信される場合、C1 と C2 が起動しますが、C3 は再初期化されます。

つまり、いくつかの条件が与えられた場合、1 つの ID に対して複数の「不在アラーム」または一般的に実行されるアクションが存在します。何が良い類似物になるかはわかりません... おそらく、タワーに「警告灯」を積み重ねて、最初に緑、次に黄色、最後に赤になるようにします。ID の不在が長く続くと、次から次へと色が点灯します (この場合、赤が最後の色です)。その後、1 つの ID が検出されると、すべてのライトがオフになります。

<編集 2: 次のように James のコードを例に改造し、残りはそのままにしてSubscribeおくと、2 つのアラーム レベルの両方で、最初のイベントで が直接呼び出されることがわかりました。

const int MaxLevels = 2;
var idAlarmStream = idStream
    .Select(i => i)
    .AlarmSystem(keySelector, thresholdSelector, MaxLevels, TaskPoolScheduler.Default)
    .Subscribe(i =>
    {
        Debug.WriteLine("Alarm on id \"{0}\" at {1}", i, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture));
    });

ここで何が起こっているかを見てみましょうMaxLevels。動的に提供できるかどうか...

<編集 3: James のコードは機能します。問題は椅子とキーボードの間にありました!時間をより賢明なものに変更することは確かに役に立ちました. 実際、私はそれらをより大きな数字に変更しましたが.FromTicks、それはそうであり、数分間私を逃れました.

4

1 に答える 1

5

これはうまくいくと思います-後でより完全な説明を追加しようとします。各アラーム レベルには、(信号グループごとに) 定義されたしきい値があります。これらの期間は長くなると予想されます。

基本的な考え方は、以前のすべてのレベルの信号を現在のレベルにフィードすることです。最初のレベルは、アラーム ストリームが返される前にフィルター処理される信号自体の「ゼロ」レベルです。TSignal キーは値の同一性をサポートする必要があることに注意してください。

単純化の余地があると確信しています!

サンプル単体テスト:

public class AlarmTests : ReactiveTest
{
    [Test]
    public void MultipleKeyMultipleSignalMultipleLevelTest()
    {
        var threshold1 = TimeSpan.FromTicks(300);
        var threshold2 = TimeSpan.FromTicks(800);

        var scheduler = new TestScheduler();
        var signals = scheduler.CreateHotObservable(
            OnNext(200, 1),
            OnNext(200, 2),
            OnNext(400, 1),
            OnNext(420, 2),
            OnNext(800, 1),
            OnNext(1000, 1),
            OnNext(1200, 1));

        Func<int, int> keySelector = i => i;
        Func<int, int, TimeSpan> thresholdSelector = (key, level) =>
        {
            if (level == 1) return threshold1;
            if (level == 2) return threshold2;
            return TimeSpan.MaxValue;
        };

        var results = scheduler.CreateObserver<Alarm<int>>();

        signals.AlarmSystem(
            keySelector,
            thresholdSelector,
            2,
            scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(700, new Alarm<int>(1, 1)),
            OnNext(720, new Alarm<int>(2, 1)),
            OnNext(1220, new Alarm<int>(2, 2)),
            OnNext(1500, new Alarm<int>(1, 1)),
            OnNext(2000, new Alarm<int>(1, 2)));
    }

    [Test]
    public void CheckAlarmIsSuppressed()
    {
        var threshold1 = TimeSpan.FromTicks(300);
        var threshold2 = TimeSpan.FromTicks(500);

        var scheduler = new TestScheduler();
        var signals = scheduler.CreateHotObservable(
            OnNext(200, 1),
            OnNext(400, 1),
            OnNext(600, 1));

        Func<int, int> keySelector = i => i;

        Func<int, int, TimeSpan> thresholdSelector = (signal, level) =>
        {
            if (level == 1) return threshold1;
            if (level == 2) return threshold2;
            return TimeSpan.MaxValue;
        };

        var results = scheduler.CreateObserver<Alarm<int>>();

        signals.AlarmSystem(
            keySelector,
            thresholdSelector,
            2,
            scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(900, new Alarm<int>(1, 1)),
            OnNext(1100, new Alarm<int>(1, 2)));
    }
}



public static class ObservableExtensions
{
    /// <summary>
    /// Create an alarm system that detects signal gaps of length
    /// determined by a signal key and signals alarms of increasing severity.
    /// </summary>
    /// <typeparam name="TSignal">Type of the signal</typeparam>
    /// <typeparam name="TKey">Type of the signal key used for grouping, must implement Equals correctly</typeparam>
    /// <param name="signals">Input signal stream</param>
    /// <param name="keySelector">Function to select a key from a signal for grouping</param>
    /// <param name="thresholdSelector">Function to select a threshold for a given signal key and alarm level.
    /// Should return TimeSpan.MaxValue for levels above the highest level</param>
    /// <param name="levels">Number of alarm levels</param>
    /// <param name="scheduler">Scheduler use for throttling</param>
    /// <returns>A stream of alarms each of which contains the signal and alarm level</returns>
    public static IObservable<Alarm<TSignal>> AlarmSystem<TSignal, TKey>(
        this IObservable<TSignal> signals,
        Func<TSignal, TKey> keySelector,
        Func<TKey, int, TimeSpan> thresholdSelector,
        int levels,
        IScheduler scheduler)
    {
        var alarmSignals = signals.Select(signal => new Alarm<TSignal>(signal, 0))
                                  .Publish()
                                  .RefCount();

        for (int i = 0; i < levels; i++)
        {
            alarmSignals = alarmSignals.CreateAlarmSystemLevel(
                keySelector, thresholdSelector, i + 1, scheduler);
        }

        return alarmSignals.Where(alarm => alarm.Level != 0);

    }

    private static IObservable<Alarm<TSignal>> CreateAlarmSystemLevel<TSignal, TKey>(
        this IObservable<Alarm<TSignal>> alarmSignals,
        Func<TSignal, TKey> keySelector,
        Func<TKey, int, TimeSpan> thresholdSelector,
        int level,
        IScheduler scheduler)
    {
        return alarmSignals
            .Where(alarmSignal => alarmSignal.Level == 0)
            .Select(alarmSignal => alarmSignal.Signal)
            .GroupByUntil(
                keySelector,
                grp => grp.Throttle(thresholdSelector(grp.Key, level), scheduler))
            .SelectMany(grp => grp.TakeLast(1).Select(signal => new Alarm<TSignal>(signal, level)))
            .Merge(alarmSignals);
    }
}

public class Alarm<TSignal> : IEquatable<Alarm<TSignal>>
{
    public Alarm(TSignal signal, int level)
    {
        Signal = signal;
        Level = level;
    }

    public TSignal Signal { get; private set; }
    public int Level { get; private set; }

    private static bool Equals(Alarm<TSignal> x, Alarm<TSignal> y)
    {
        if (ReferenceEquals(x, null))
            return false;
        if (ReferenceEquals(y, null))
            return false;
        if (ReferenceEquals(x, y))
            return true;

        return x.Signal.Equals(y.Signal) && x.Level.Equals(y.Level);
    }

    // Equality implementation added to help with testing.
    public override bool Equals(object other)
    {
        return Equals(this, other as Alarm<TSignal>);
    }

    public override string ToString()
    {
        return string.Format("Signal: {0} Level: {1}", Signal, Level);
    }

    public bool Equals(Alarm<TSignal> other)
    {
        return Equals(this, other);
    }

    public static bool operator ==(Alarm<TSignal> x, Alarm<TSignal> y)
    {
        return Equals(x, y);
    }

    public static bool operator !=(Alarm<TSignal> x, Alarm<TSignal> y)
    {
        return !Equals(x, y);
    }

    public override int GetHashCode()
    {
        return ((Signal.GetHashCode()*37) ^ Level.GetHashCode()*329);
    }
}
于 2013-10-18T01:39:13.727 に答える