10

重要:結果の説明と詳細については、私の回答もご覧ください

通常レプリケートされる一連のオブジェクト/イベントをグループ化してフィルター処理し、TimeSpan 間隔でバッファーに入れる必要があります。私は大理石の図のようなものでそれをよりよく説明しようとしています:

X-X-X-X-X-Y-Y-Y-Z-Z-Z-Z-X-X-Y-Z-Z

生み出すだろう

X---Y---Z---X---Y---Z

ここで、X、Y、および Z は異なるイベント タイプであり、'---' は間隔を意味します。さらに、共通の基本クラスがあるため、すべての型で使用できるというキー プロパティによっても区別したいと思います。

X, Y, Z : A

A にはプロパティ Key が含まれます。X.Key = a を意味する表記 Xa を使用すると、最終的なサンプルは次のようになります。

X.a-X.b-X.a-Y.b-Y.c-Z.a-Z.a-Z.c-Z.b-Z.c

生み出すだろう

X.a-X.b---Y.b-Y.c-Z.a-Z.c-Z.b

この動作を実現するために必要な Linq 演算子 (おそらく DistinctUntilChanged と Buffer) をまとめてくれる人はいますか? ありがとう

更新 18.08.12 :

リクエストに応じて、より良い説明をしようとします。イベントを収集して Web サービスに送信するデバイスがあります。これらのデバイスには古いロジックがあり (下位互換性のため変更できません)、確認応答を受信するまで継続的にイベントを送信します。承認後、キュー内の次のイベントを送信します。イベントには、ユニットのネットワーク アドレスと、各デバイスのキュー内のイベントを区別するその他のプロパティが含まれます。イベントは次のようになります。

class Event
{
    public string NetworkAddress { get; }

    public string EventCode { get; }

    public string AdditionalAttribute { get; }
}

目標は、すべてのデバイスから受信した識別されたイベントを 5 秒ごとに処理し、情報をデータベースに保存し (そのため、バッチ処理を行いたくないのです)、ack をデバイスに送信することです。2 つのデバイスといくつかのイベントのみの例を作成してみましょう。

Device 'a':
Event 1 (a1): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'x'
Event 2 (a2): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'y'
Event 3 (a3): NetworkAddress = '1', EventCode = B, AdditionalAttribute = 'x'

Device 'b':
Event 1 (b1): NetworkAddress = '2', EventCode = A, AdditionalAttribute = 'y'
Event 2 (b2): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'x'
Event 3 (b3): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'y'
Event 4 (b4): NetworkAddress = '2', EventCode = C, AdditionalAttribute = 'x'

Pn are the operations done by our server, explained later

考えられるマーブル ダイアグラム (入力ストリーム + 出力ストリーム):

Device 'a'          : -[a1]-[a1]-[a1]----------------[a2]-[a2]-[a2]-[a3]-[a3]-[a3]-...
Device 'b'          : ------[b1]-[b1]-[b2]-[b2]-[b2]------[b3]-[b3]-[b4]-[b4]-[b4]-...

Time                : ------------[1s]-----------[2s]------------[3s]------------[4s]-
DB/acks (rx output) : ------------[P1]-----------[P2]------------[P3]------------[P4]-

P1: Server stores and acknowledges [a1] and [b1]
P2: "      "      "   "            [b2]
P3: "      "      "   "            [a2] and [b3]
P4: "      "      "   "            [a3] and [b4]

最後に、おそらく基本的な演算子の単純な組み合わせだと思いますが、Rx は初めてで、同じ出力ストリームを取得する演算子 (または演算子の組み合わせ) がたくさんあるように見えるので、少し混乱しています。 .

更新 19.08.12 :

このコードはサーバー上で実行され、メモリ リークなしで数日間実行されるはずであることに注意してください...被験者の動作についてはわかりません。現時点では、イベントごとにサービスでプッシュ操作を呼び出します。これは、クエリを作成する必要があるサブジェクトの OnNext を呼び出します (サブジェクトの使用法について間違っていない場合)。

更新 20.08.12 :

検証テストを含む現在の実装。これは私が試したもので、@yamenによって提案されたのと同じようです

public interface IEventService
{
    // Persists the events
    void Add(IEnumerable<Event> events);
}

public class Event
{
    public string Description { get; set; }
}

/// <summary>
/// Implements the logic to handle events.
/// </summary>
public class EventManager : IDisposable
{
    private static readonly TimeSpan EventHandlingPeriod = TimeSpan.FromSeconds(5);

    private readonly Subject<EventMessage> subject = new Subject<EventMessage>();

    private readonly IDisposable subscription;

    private readonly object locker = new object();

    private readonly IEventService eventService;

    /// <summary>
    /// Initializes a new instance of the <see cref="EventManager"/> class.
    /// </summary>
    /// <param name="scheduler">The scheduler.</param>
    public EventManager(IEventService eventService, IScheduler scheduler)
    {
        this.eventService = eventService;
        this.subscription = this.CreateQuery(scheduler);
    }

    /// <summary>
    /// Pushes the event.
    /// </summary>
    /// <param name="eventMessage">The event message.</param>
    public void PushEvent(EventMessage eventMessage)
    {
        Contract.Requires(eventMessage != null);
        this.subject.OnNext(eventMessage);
    }

    /// <summary>
    /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
    /// </summary>
    /// <filterpriority>2</filterpriority>
    public void Dispose()
    {
        this.Dispose(true);
    }

    private void Dispose(bool disposing)
    {
        if (disposing)
        {
            // Dispose unmanaged resources
        }

        this.subject.Dispose();
        this.subscription.Dispose();
    }

    private IDisposable CreateQuery(IScheduler scheduler)
    {
        var buffered = this.subject
            .DistinctUntilChanged(new EventComparer())
            .Buffer(EventHandlingPeriod, scheduler);

        var query = buffered
            .Subscribe(this.HandleEvents);
        return query;
    }

    private void HandleEvents(IList<EventMessage> eventMessages)
    {
        Contract.Requires(eventMessages != null);
        var events = eventMessages.Select(this.SelectEvent);
        this.eventService.Add(events);
    }

    private Event SelectEvent(EventMessage message)
    {
        return new Event { Description = "evaluated description" };
    }

    private class EventComparer : IEqualityComparer<EventMessage>
    {
        public bool Equals(EventMessage x, EventMessage y)
        {
            return x.NetworkAddress == y.NetworkAddress && x.EventCode == y.EventCode && x.Attribute == y.Attribute;
        }

        public int GetHashCode(EventMessage obj)
        {
            var s = string.Concat(obj.NetworkAddress + "_" + obj.EventCode + "_" + obj.Attribute);
            return s.GetHashCode();
        }
    }
}

public class EventMessage
{
    public string NetworkAddress { get; set; }

    public byte EventCode { get; set; }

    public byte Attribute { get; set; }

    // Other properties
}

そしてテスト:

public void PushEventTest()
    {
        const string Address1 = "A:2.1.1";
        const string Address2 = "A:2.1.2";

        var eventServiceMock = new Mock<IEventService>();

        var scheduler = new TestScheduler();
        var target = new EventManager(eventServiceMock.Object, scheduler);
        var eventMessageA1 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 };
        var eventMessageB1 = new EventMessage { NetworkAddress = Address2, EventCode = 1, Attribute = 5 };
        var eventMessageA2 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 };
        scheduler.Schedule(() => target.PushEvent(eventMessageA1));
        scheduler.Schedule(TimeSpan.FromSeconds(1), () => target.PushEvent(eventMessageB1));
        scheduler.Schedule(TimeSpan.FromSeconds(2), () => target.PushEvent(eventMessageA1));

        scheduler.AdvanceTo(TimeSpan.FromSeconds(6).Ticks);

        eventServiceMock.Verify(s => s.Add(It.Is<List<Event>>(list => list.Count == 2)), Times.Once());

        scheduler.Schedule(TimeSpan.FromSeconds(3), () => target.PushEvent(eventMessageB1));

        scheduler.AdvanceTo(TimeSpan.FromSeconds(11).Ticks);

        eventServiceMock.Verify(s => s.Add(It.Is<List<Event>>(list => list.Count == 1)), Times.Once());
    }

さらに、何千ものメッセージを処理しながら、何日も問題なくソフトウェアを実行できることが非常に重要であることを再度述べておきます。明確にするために、現在の実装ではテストに合格しません。

4

3 に答える 3

4

これがあなたの望むとおりに機能するかどうかはわかりませんが、groupキーワードを使用して明示的に要素をグループ化し、IObservableそれらを再結合する前にさまざまな を個別に操作する必要があるかもしれません。

たとえば、次のようなクラス定義がある場合

class A
{
    public char Key { get; set; }
}

class X : A { }
...

そしてSubject<A>

Subject<A> subject = new Subject<A>();

それから私たちは書くことができます

var buffered =
    from a in subject
    group a by new { Type = a.GetType(), Key = a.Key } into g
    from buffer in g.Buffer(TimeSpan.FromMilliseconds(300))
    where buffer.Any()
    select new
    {
        Count = buffer.Count,
        Type = buffer.First().GetType().Name,
        Key = buffer.First().Key
    };

buffered.Do(Console.WriteLine).Subscribe();

提供されたデータを使用してこれをテストできます。

subject.OnNext(new X { Key = 'a' }); 
Thread.Sleep(100);
subject.OnNext(new X { Key = 'b' }); 
Thread.Sleep(100);
subject.OnNext(new X { Key = 'a' }); 
Thread.Sleep(100);
...
subject.OnCompleted();

提供した出力を取得するには:

{ Count = 2, Type = X, Key = a }
{ Count = 1, Type = X, Key = b }
{ Count = 1, Type = Y, Key = b }
{ Count = 1, Type = Y, Key = c }
{ Count = 2, Type = Z, Key = a }
{ Count = 2, Type = Z, Key = c }
{ Count = 1, Type = Z, Key = b }
于 2012-08-11T21:25:31.520 に答える
2

これがまさにあなたが望むものかどうかはわかりませんが、ユースケースをサポートしているようです.

まず、使用する基本クラスを定義しましょう (必要に応じて簡単に変更できます)。

public class MyEvent
{
    public string NetworkAddress { set; get; }
    public string EventCode { set; get; }
}

デバイスを配列として設定しましょうIObservable<MyEvent>- これらは異なる方法で利用できる場合があり、もちろんそれに対応するために以下を変更する必要があります。これらのデバイスはそれぞれ、0.5 ~ 1.5 秒のランダムな遅延で値を生成します。

var deviceA = new MyEvent[] { new MyEvent() {NetworkAddress = "A", EventCode = "1"},
                              new MyEvent() {NetworkAddress = "A", EventCode = "1"},
                              new MyEvent() {NetworkAddress = "A", EventCode = "2"} };

var deviceB = new MyEvent[] { new MyEvent() {NetworkAddress = "B", EventCode = "1"},
                              new MyEvent() {NetworkAddress = "B", EventCode = "2"},
                              new MyEvent() {NetworkAddress = "B", EventCode = "2"},
                              new MyEvent() {NetworkAddress = "B", EventCode = "3"} };   

var random = new Random();                                 

var deviceARand = deviceA.ToObservable().Select(a => Observable.Return(a).Delay(TimeSpan.FromMilliseconds(random.Next(500,1500)))).Concat();
var deviceBRand = deviceB.ToObservable().Select(b => Observable.Return(b).Delay(TimeSpan.FromMilliseconds(random.Next(500,1500)))).Concat();

var devices = new IObservable<MyEvent>[] { deviceARand, deviceBRand };

次に、これらの個々のデバイス ストリームをすべて取得し、それらを「区別」して、単一のマスター ストリームにマージしましょう。

var stream = devices.Aggregate(Observable.Empty<MyEvent>(), (acc, device) => acc.DistinctUntilChanged(a => a.EventCode).Merge(device));

それができたら、このストリームを定期的に消費するには、次のようにバッファリングするだけですBuffer:

stream.Buffer(TimeSpan.FromSeconds(1)).Subscribe(x => { /* code here works on a list of the filtered events per second */ });
于 2012-08-20T07:36:17.447 に答える
0

検索と実験の後、期待どおりの出力を生成するコードをいくつかまとめました。

static void Main(string[] args)
    {
        const string Address1 = "A:2.1.1";
        const string Address2 = "A:2.1.2";
        var comparer = new EventComparer();
        var eventMessageA1 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 };
        var eventMessageB1 = new EventMessage { NetworkAddress = Address2, EventCode = 1, Attribute = 5 };
        var eventMessageA2 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 5 };
        var list = new[] { eventMessageA1, eventMessageA1, eventMessageB1, eventMessageA2, eventMessageA1, eventMessageA1 };

        var queue = new BlockingCollection<EventMessage>();
        Observable.Interval(TimeSpan.FromSeconds(2)).Subscribe
            (
                l => list.ToList().ForEach(m =>
                {
                    Console.WriteLine("Producing {0} on thread {1}", m, Thread.CurrentThread.ManagedThreadId);
                    queue.Add(m);
                })
            );

        // subscribing
        queue.GetConsumingEnumerable()
            .ToObservable()
             .Buffer(TimeSpan.FromSeconds(5))
             .Subscribe(e =>
                 {
                     Console.WriteLine("Queue contains {0} items", queue.Count);
                     e.Distinct(comparer).ToList().ForEach(m =>
                  Console.WriteLine("{0} - Consuming: {1} (queue contains {2} items)", DateTime.UtcNow, m, queue.Count));
                 }
             );

        Console.WriteLine("Type enter to exit");
        Console.ReadLine();
    }

    public class EventComparer : IEqualityComparer<EventMessage>
    {
        public bool Equals(EventMessage x, EventMessage y)
        {
            var result = x.NetworkAddress == y.NetworkAddress && x.EventCode == y.EventCode && x.Attribute == y.Attribute;
            return result;
        }

        public int GetHashCode(EventMessage obj)
        {
            var s = string.Concat(obj.NetworkAddress + "_" + obj.EventCode + "_" + obj.Attribute);
            return s.GetHashCode();
        }
    }

    public class EventMessage
    {
        public string NetworkAddress { get; set; }

        public byte EventCode { get; set; }

        public byte Attribute { get; set; }

        public override string ToString()
        {
            const string Format = "{0} ({1}, {2})";
            var s = string.Format(Format, this.NetworkAddress, this.EventCode, this.Attribute);
            return s;
        }
    }

とにかく、アプリケーションを監視すると、これがメモリ リークを引き起こしているようです。私の質問は今です:

  • メモリリークの原因は何ですか? 【以下のアップデートをご覧ください】
  • これはそれを行うための最良の方法ですか (最初のオブザーバブルに個別を置くと、次のバッファーで他のイベントを取得しませんが、各バッファー内のアイテムは他のものから分離する必要があります)?
  • テスト スケジューラを使用してテストを作成するにはどうすればよいですか?

更新

メモリの増分は数分しか続かないようで、その後値は安定しています。私は長いテストを実行します。もちろん、これは絶対に許容できる動作です。

更新 26.08.12 :

  • 以前の更新で既に述べたように、メモリ使用量は、起動後数分間だけ (そしてゆっくりと) 増加します。8 時間後、消費されたメモリは安定しており、通常の変動は数 KB の範囲でした)。
  • この質問は私のものと非常に似ており、提案された Drain 拡張機能は私の問題にうまく適用できます (まだ検証中です)。

とにかく、テストスケジューラを使用した単体テストについては、私の質問がまだ開いていると思います。

ありがとうフランチェスコ

于 2012-08-25T10:06:26.603 に答える