Reactive Extensions を使用してメッセージを変換し、少し遅れて中継したいと考えています。
メッセージは次のようになります。
class InMsg
{
int GroupId { get; set; }
int Delay { get; set; }
string Content { get; set; }
}
出力は次のようになります。
class OutMsg
{
int GroupId { get; set; }
string Content { get; set; }
OutMsg(InMsg in)
{
GroupId = in.GroupId;
Content = Transform(in.Content); // function omitted
}
}
いくつかの要件があります。
- 遅延の長さは、メッセージの内容によって異なります。
- 各メッセージには GroupId があります
- 新しいメッセージが送信待ちの遅延メッセージと同じ GroupId を持つ場合、最初のメッセージはドロップされ、新しい遅延期間の後に 2 番目のメッセージのみが送信されます。
Observable<InMsg> と Send 関数が与えられた場合:
IObservable<InMsg> inMsgs = ...;
void Send(OutMsg o)
{
... // publishes transformed messages
}
Select を使用して変換を実行できることを理解しています。
void SetUp()
{
inMsgs.Select(i => new OutMsg(i)).Subscribe(Send);
}
- メッセージ指定遅延を適用するにはどうすればよいですか? (これにより、メッセージの配信が順不同になる可能性があることに注意してください。)
- 同じ GroupId を持つメッセージを重複排除するにはどうすればよいですか?
- Rx はこの問題を解決できますか?
- これを解決する別の方法はありますか?