重要:結果の説明と詳細については、私の回答もご覧ください
通常レプリケートされる一連のオブジェクト/イベントをグループ化してフィルター処理し、TimeSpan 間隔でバッファーに入れる必要があります。私は大理石の図のようなものでそれをよりよく説明しようとしています:
X-X-X-X-X-Y-Y-Y-Z-Z-Z-Z-X-X-Y-Z-Z
生み出すだろう
X---Y---Z---X---Y---Z
ここで、X、Y、および Z は異なるイベント タイプであり、'---' は間隔を意味します。さらに、共通の基本クラスがあるため、すべての型で使用できるというキー プロパティによっても区別したいと思います。
X, Y, Z : A
A にはプロパティ Key が含まれます。X.Key = a を意味する表記 Xa を使用すると、最終的なサンプルは次のようになります。
X.a-X.b-X.a-Y.b-Y.c-Z.a-Z.a-Z.c-Z.b-Z.c
生み出すだろう
X.a-X.b---Y.b-Y.c-Z.a-Z.c-Z.b
この動作を実現するために必要な Linq 演算子 (おそらく DistinctUntilChanged と Buffer) をまとめてくれる人はいますか? ありがとう
更新 18.08.12 :
リクエストに応じて、より良い説明をしようとします。イベントを収集して Web サービスに送信するデバイスがあります。これらのデバイスには古いロジックがあり (下位互換性のため変更できません)、確認応答を受信するまで継続的にイベントを送信します。承認後、キュー内の次のイベントを送信します。イベントには、ユニットのネットワーク アドレスと、各デバイスのキュー内のイベントを区別するその他のプロパティが含まれます。イベントは次のようになります。
class Event
{
public string NetworkAddress { get; }
public string EventCode { get; }
public string AdditionalAttribute { get; }
}
目標は、すべてのデバイスから受信した識別されたイベントを 5 秒ごとに処理し、情報をデータベースに保存し (そのため、バッチ処理を行いたくないのです)、ack をデバイスに送信することです。2 つのデバイスといくつかのイベントのみの例を作成してみましょう。
Device 'a':
Event 1 (a1): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'x'
Event 2 (a2): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'y'
Event 3 (a3): NetworkAddress = '1', EventCode = B, AdditionalAttribute = 'x'
Device 'b':
Event 1 (b1): NetworkAddress = '2', EventCode = A, AdditionalAttribute = 'y'
Event 2 (b2): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'x'
Event 3 (b3): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'y'
Event 4 (b4): NetworkAddress = '2', EventCode = C, AdditionalAttribute = 'x'
Pn are the operations done by our server, explained later
考えられるマーブル ダイアグラム (入力ストリーム + 出力ストリーム):
Device 'a' : -[a1]-[a1]-[a1]----------------[a2]-[a2]-[a2]-[a3]-[a3]-[a3]-...
Device 'b' : ------[b1]-[b1]-[b2]-[b2]-[b2]------[b3]-[b3]-[b4]-[b4]-[b4]-...
Time : ------------[1s]-----------[2s]------------[3s]------------[4s]-
DB/acks (rx output) : ------------[P1]-----------[P2]------------[P3]------------[P4]-
P1: Server stores and acknowledges [a1] and [b1]
P2: " " " " [b2]
P3: " " " " [a2] and [b3]
P4: " " " " [a3] and [b4]
最後に、おそらく基本的な演算子の単純な組み合わせだと思いますが、Rx は初めてで、同じ出力ストリームを取得する演算子 (または演算子の組み合わせ) がたくさんあるように見えるので、少し混乱しています。 .
更新 19.08.12 :
このコードはサーバー上で実行され、メモリ リークなしで数日間実行されるはずであることに注意してください...被験者の動作についてはわかりません。現時点では、イベントごとにサービスでプッシュ操作を呼び出します。これは、クエリを作成する必要があるサブジェクトの OnNext を呼び出します (サブジェクトの使用法について間違っていない場合)。
更新 20.08.12 :
検証テストを含む現在の実装。これは私が試したもので、@yamenによって提案されたのと同じようです
public interface IEventService
{
// Persists the events
void Add(IEnumerable<Event> events);
}
public class Event
{
public string Description { get; set; }
}
/// <summary>
/// Implements the logic to handle events.
/// </summary>
public class EventManager : IDisposable
{
private static readonly TimeSpan EventHandlingPeriod = TimeSpan.FromSeconds(5);
private readonly Subject<EventMessage> subject = new Subject<EventMessage>();
private readonly IDisposable subscription;
private readonly object locker = new object();
private readonly IEventService eventService;
/// <summary>
/// Initializes a new instance of the <see cref="EventManager"/> class.
/// </summary>
/// <param name="scheduler">The scheduler.</param>
public EventManager(IEventService eventService, IScheduler scheduler)
{
this.eventService = eventService;
this.subscription = this.CreateQuery(scheduler);
}
/// <summary>
/// Pushes the event.
/// </summary>
/// <param name="eventMessage">The event message.</param>
public void PushEvent(EventMessage eventMessage)
{
Contract.Requires(eventMessage != null);
this.subject.OnNext(eventMessage);
}
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
/// <filterpriority>2</filterpriority>
public void Dispose()
{
this.Dispose(true);
}
private void Dispose(bool disposing)
{
if (disposing)
{
// Dispose unmanaged resources
}
this.subject.Dispose();
this.subscription.Dispose();
}
private IDisposable CreateQuery(IScheduler scheduler)
{
var buffered = this.subject
.DistinctUntilChanged(new EventComparer())
.Buffer(EventHandlingPeriod, scheduler);
var query = buffered
.Subscribe(this.HandleEvents);
return query;
}
private void HandleEvents(IList<EventMessage> eventMessages)
{
Contract.Requires(eventMessages != null);
var events = eventMessages.Select(this.SelectEvent);
this.eventService.Add(events);
}
private Event SelectEvent(EventMessage message)
{
return new Event { Description = "evaluated description" };
}
private class EventComparer : IEqualityComparer<EventMessage>
{
public bool Equals(EventMessage x, EventMessage y)
{
return x.NetworkAddress == y.NetworkAddress && x.EventCode == y.EventCode && x.Attribute == y.Attribute;
}
public int GetHashCode(EventMessage obj)
{
var s = string.Concat(obj.NetworkAddress + "_" + obj.EventCode + "_" + obj.Attribute);
return s.GetHashCode();
}
}
}
public class EventMessage
{
public string NetworkAddress { get; set; }
public byte EventCode { get; set; }
public byte Attribute { get; set; }
// Other properties
}
そしてテスト:
public void PushEventTest()
{
const string Address1 = "A:2.1.1";
const string Address2 = "A:2.1.2";
var eventServiceMock = new Mock<IEventService>();
var scheduler = new TestScheduler();
var target = new EventManager(eventServiceMock.Object, scheduler);
var eventMessageA1 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 };
var eventMessageB1 = new EventMessage { NetworkAddress = Address2, EventCode = 1, Attribute = 5 };
var eventMessageA2 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 };
scheduler.Schedule(() => target.PushEvent(eventMessageA1));
scheduler.Schedule(TimeSpan.FromSeconds(1), () => target.PushEvent(eventMessageB1));
scheduler.Schedule(TimeSpan.FromSeconds(2), () => target.PushEvent(eventMessageA1));
scheduler.AdvanceTo(TimeSpan.FromSeconds(6).Ticks);
eventServiceMock.Verify(s => s.Add(It.Is<List<Event>>(list => list.Count == 2)), Times.Once());
scheduler.Schedule(TimeSpan.FromSeconds(3), () => target.PushEvent(eventMessageB1));
scheduler.AdvanceTo(TimeSpan.FromSeconds(11).Ticks);
eventServiceMock.Verify(s => s.Add(It.Is<List<Event>>(list => list.Count == 1)), Times.Once());
}
さらに、何千ものメッセージを処理しながら、何日も問題なくソフトウェアを実行できることが非常に重要であることを再度述べておきます。明確にするために、現在の実装ではテストに合格しません。