3

私のシステムには、接続ステータス、CPU 負荷、ログに記録されたユーザーなど、多くのステータス オブジェクトがあります。このようなイベントはすべて、単一の監視可能なストリームにマージされます。

システムの実際のステータスを表示し、そのすべてのカウンターを表示する管理ユーティリティを作成したいと考えています。

すべてのカウンターの最後に変更された値のリストを持つオブザーバブルを作成するにはどうすればよいですか?

これが私が欲しい大理石の図です:

s1 (cpu):               -s1_v1----s1_v1---s1_v2
s2 (users count):       --s2_v1--s2_v1---------s2_v2
s3 (some cat purr/sec)  ----s3_v1----s3_v1----s3_v1

flatten sequence: s1_v1-s2_v1-s3_v1-s2_v1-s1_v1-s3_v1-s1_v2-s3_v1-s2_v2

望ましい出力:

s1_v1|s1_v1|s1_v1|s1_v2|s1_v2
      s2_v1|s2_v1|s2_v1|s2_v2
            s3_v1|s3_v1|s3_v1

これまでのところ、この実装にできます:

public class StatusImplementation
{
    public static IObservable<IDictionary<TKey, TValue>> Status<TKey, TValue>(
        params IObservable<KeyValuePair<TKey, TValue>>[] observables)
    {
        var uniqueObservables = observables
            .Select(x => x.Publish().RefCount().DistinctUntilChanged());

        return Observable.Create<IDictionary<TKey, TValue>>(o =>
        {
            var compositeDisposable = new CompositeDisposable();
            var dictionary = new Dictionary<TKey, TValue>();

            foreach (var uniqueObservable in uniqueObservables)
            {
                var disposable = uniqueObservable.Subscribe(x =>
                {
                    if (dictionary.ContainsKey(x.Key) && !dictionary[x.Key].Equals(x.Value))
                    {
                        var newDictionary = new Dictionary<TKey, TValue>(dictionary);
                        newDictionary[x.Key] = x.Value;
                        dictionary = newDictionary;
                    }
                    else
                    {
                        dictionary.Add(x.Key, x.Value);
                    }

                    o.OnNext(dictionary);
                });
                compositeDisposable.Add(disposable);
            }

            return compositeDisposable;
        });
    }
}

そして、ここに使用例があります:

        var f1 = Observable.Interval(TimeSpan.FromMilliseconds(1000))
            .Select(x => new KeyValuePair<string, long>("event 1", x));
        var f2 = Observable.Interval(TimeSpan.FromMilliseconds(1200))
            .Select(x => new KeyValuePair<string, long>("event 2", x));
        var f3 = Observable.Interval(TimeSpan.FromMilliseconds(1250))
            .Select(x => new KeyValuePair<string, long>("event 3", x));

        var combined = f1.Merge(f2).Merge(f3);

        StatusImplementation.Status(f1, f2, f3)
            .Select(x => string.Join(", ", x.ToList()))
            .Dump("\tstatus");

        combined.Dump("normal");

そして Dump 関数 ( Lee Campbell の素晴らしい本から):

    public static void Dump<T>(this IObservable<T> source, string name)
    {
        source.Subscribe(
            i => Console.WriteLine("{0}-->{1}", name, i),
            ex => Console.WriteLine("{0} failed-->{1}", name, ex.Message),
            () => Console.WriteLine("{0} completed", name));
    }

問題は、この機能を実装するためのより良い方法はありますか? おそらく、オブザーバブル内で Dictionary を使用していませんか?

ありがとうございました。

4

2 に答える 2

1

Observable.CombineLatest新しい値が到着するたびに、すべてのオブザーバブルから最新の値を発行する を使用できます。そうすれば、辞書を使う必要はありません。

 var f1 = Observable.Interval(TimeSpan.FromMilliseconds(1000))
 .Select(x = > new KeyValuePair < string, long > ("event 1", x));
var f2 = Observable.Interval(TimeSpan.FromMilliseconds(1200))
 .Select(x = > new KeyValuePair < string, long > ("event 2", x));
var f3 = Observable.Interval(TimeSpan.FromMilliseconds(1250))
 .Select(x = > new KeyValuePair < string, long > ("event 3", x));

var combined = f1.Merge(f2).Merge(f3);

Observable.CombineLatest(f1, f2, f3)
 .Select(x = > string.Join(", ", x.ToList()))
 .Dump("\tstatus");

combined.Dump("normal");
于 2015-05-23T01:19:38.390 に答える
1

したがって、オブザーバブルから開始する場合combined(任意の数のソース オブザーバブルから生成できます)、次のことができます。

var query =
    combined
        .Scan(
            new Dictionary<string, long>() as IDictionary<string, long>,
            (d, kvp) =>
            {
                var d2 = new Dictionary<string, long>(d) as IDictionary<string, long>;
                d2[kvp.Key] = kvp.Value;
                return d2;
            });

combinedこれにより、オブザーバブルによって生成された各値の一連の辞書オブジェクトが返されます。各ディクショナリ オブジェクトは個別のインスタンスになります。同じインスタンスが返された場合、スレッドの問題を引き起こす可能性のある値を変更することがあります。

于 2015-05-23T12:37:41.737 に答える