10

Reactive Extensions を使用してメッセージを変換し、少し遅れて中継したいと考えています。

メッセージは次のようになります。

class InMsg
{
   int GroupId { get; set; }
   int Delay { get; set; }
   string Content { get; set; }
}

出力は次のようになります。

class OutMsg
{ 
   int GroupId { get; set; }
   string Content { get; set; }
   OutMsg(InMsg in)
   {
       GroupId = in.GroupId;
       Content = Transform(in.Content);  // function omitted
   }
}

いくつかの要件があります。

  • 遅延の長さは、メッセージの内容によって異なります。
  • 各メッセージには GroupId があります
  • 新しいメッセージが送信待ちの遅延メッセージと同じ GroupId を持つ場合、最初のメッセージはドロップされ、新しい遅延期間の後に 2 番目のメッセージのみが送信されます。

Observable<InMsg> と Send 関数が与えられた場合:

IObservable<InMsg> inMsgs = ...;

void Send(OutMsg o)
{
     ... // publishes transformed messages
}

Select を使用して変換を実行できることを理解しています。

void SetUp()
{
     inMsgs.Select(i => new OutMsg(i)).Subscribe(Send);
}
  • メッセージ指定遅延を適用するにはどうすればよいですか? (これにより、メッセージの配信が順不同になる可能性があることに注意してください。)
  • 同じ GroupId を持つメッセージを重複排除するにはどうすればよいですか?
  • Rx はこの問題を解決できますか?
  • これを解決する別の方法はありますか?
4

2 に答える 2

9

を使用GroupByしてIGroupedObservableDelay出力を遅延させ、Switch新しい値がグループ内の以前の値を確実に置き換えるようにすることができます。

IObservable<InMsg> inMessages;

inMessages
    .GroupBy(msg => msg.GroupId)
    .Select(group =>
        {
            return group.Select(groupMsg => 
                {
                    TimeSpan delay = TimeSpan.FromMilliseconds(groupMsg.Delay);
                    OutMsg outMsg = new OutMsg(); // map InMsg -> OutMsg here

                    return Observable.Return(outMsg).Delay(delay);
                })
                .Switch();
        })
        .Subscribe(outMsg => Console.Write("OutMsg received"));

実装に関する注意: グループ化された値がメッセージの送信後 (遅延後) に到着し場合、新しい遅延が開始されます。

于 2011-01-19T17:09:51.680 に答える