1

IObservable<Packet>さまざまなサブスクリプションが着信パケットを分析できるようにするホットオブザーバブルです。

ID を持つデータを送信し、同じ ID を持つ応答を待機するメソッドを作成したいと考えています。擬似コード:

void SendData(byte[] data, int retries, int timeout, Action<Packet> success, Action fail) 
{
    var sequenceId = GetSequenceId();
    _port.SendData(data, sequenceId);
    _packetStream.Where(p => p.SequenceId == sequenceId)
                 .Take(1)
                 .WaitForTimeout(timeout)
                 .WaitForRetry(retries)
                 .Subscribe(success) //Need to unsubscribe after packet is received
    //If we didn't receive an answer packet, then call fail() action 
}

このようなことが Reactive Extensions で通常どのように行われるかはよくわかりません。いくつかの提案を受け取って本当にうれしいです。ありがとう。

4

2 に答える 2

1

あなたの質問のコードは右に近いように見えます。Rxフレームワークには2つの「待機」メソッド(TimeoutRetry)があります。メソッドを変更してanを返し、andパラメーターIObservableを削除することをお勧めします。そうすることで、「モナドにとどまり」、必要に応じて、さらにオペレーターをオブザーバブルにチェーンできます。代わりに、結果のobservableをサブスクライブするときにsuccessパラメーターとfailパラメーターが使用されます(それぞれOnNextおよびOnErrorとして)。 successfail

タイムアウト時にデータを再送する必要があると思います(そうしないと、実際には再試行しません)。これを行うには、Observable.Createサブスクリプション時にデータを送信するために使用できます。

IObservable<Packet> SendData(byte[] data, int retries, TimeSpan timeout)
{
    //only get the sequence id once per call to SendData, regardless of retries
    var sequenceId = GetSequenceId();
    return Observable.Create(obs =>
        {   //this code runs every time you subscribe
            _port.SendData(data, sequenceId);
            return _packetStream.Where(p => p.SequenceId == sequenceId)
                                .Take(1)
                                .Timeout(timeout)
                                .Subscribe(obs)
        })
        .Retry(retries); 
}

演算子を最後に置くと、Retryタイムアウトした場合にCreateobservableが再試行されます。余談ですが、タイムアウトの場合に使用する別の監視可能なシーケンスを渡すことができるタイムアウトのオーバーロードがあります。このオーバーロードを一緒に使用してObservable.Throw、代替エラーメッセージを提供するなど、必要に応じてタイムアウトの場合に独自の例外を提供できます。

このコードは、サブスクライブするまでデータを送信せ、結果が返されるかタイムアウトに達するまでブロックされませんが、サブスクリプションを破棄することでそれ以上の再試行をキャンセルできることに注意してください。このコードは、同時に複数のパケットを送信することも妨げません。ブロックする必要がある場合は、次のようにすることができます。

var response = SendData(/* arguments */);
response.Do(success, fail).StartWith(null).ToTask().Wait();

C#5を使用していて、非同期メソッド内でこれを呼び出す場合は、observableを待つことができます。

于 2012-12-28T19:15:14.327 に答える
0

これまでに得たものは次のとおりです。

private void WaitForAnswer(byte sequenceId, int timeout, Action<Packet> success, Action fail, int retriesLeft)
{
    _packetStream.Where(p => p.GetSequence() == sequenceId)
                 .Take(1)
                 .Timeout(TimeSpan.FromMilliseconds(timeout))
                 .Subscribe(success, 
                            _ => {
                                if (retriesLeft > 0) WaitForAnswer(sequenceId, timeout, success, fail, --retriesLeft);
                                else fail();
                            }, 
                            () => { });
}

ただし、このソリューションがサブスクリプションを正しく破棄するかどうかはよくわかりません。

于 2012-12-28T10:36:43.397 に答える