0

Reactive Extensions を使用して匿名の Observable を作成し、Web リクエストを実行して後処理し、何らかの値を返します。この手法は、値取得のメカニズムを、これらの値を使用する他のクラスから分離するのに役立ちました。Web リクエストは頻繁に実行されるため、この手法を使用してリクエストを構築および呼び出すタイマーがあります。Observable を次のようにビルドします。

Observable.FromAsyncPattern<WebResponse>(getRequest.BeginGetResponse, getRequest.EndGetResponse)

実際、エンドポイントは常に利用できるとは限りません。Web リクエスト タイムアウトの場合、Subscribe()メソッドで指定されたコールバックは呼び出されません。このシナリオはTimeout()Rx 拡張機能で処理しています。また、リクエスト結果の後処理も行っています。この問題に対処するために実装したエミュレーション コードの一部を次に示します。

var request = HttpWebRequest.Create("http://10.5.5.137/unreachable_endpoint");
var obs = Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)();
var disposable = obs.Select(res => res.ContentLength).Timeout(TimeSpan.FromSeconds(2)).Subscribe(...)

Subscribe()次に、Observable からコールバック デリゲートをサブスクライブ解除するために、メソッドの戻り値を破棄する実装を実装しました。実際、私はトリプルのコレクションを持ってい<IObservable obs, IDisposable disp, bool RequestComplete>ます。Subscribe()メソッドのコールバックが設定されRequestComplete = true、不要になった場合、つまりコールバックがすでに呼び出されている場合は、すべての使い捨てが Disposed になります。すべてがうまくいっているように見えましたが、コレクションが増え続けていることに気付きました。その理由はわかりません。

そこで、この状況のエミュレーションを実装しました。このコードには破棄部分がありません。単純なリクエスト カウンターに置き換えました。

public static class RequestCounter
{
    private static object syncRoot = new object();

    private static int count = 0;

    public static void Increment()
    {
        lock (syncRoot)
        {
            count++;
        }
    }

    public static void Decrement()
    {
        lock (syncRoot)
        {
            count--;
        }
    }

    public static int RequestCount
    {
        get { return count; }
    }
}

そして、ここにテスト自体があります:

public void RunTest()
    {
        while (true)
        {
            var request = HttpWebRequest.Create("http://10.5.5.137/unreachable_endpoint");
            var obs = Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)();
            RequestCounter.Increment();
            var disposable = obs.Select(res => res.ContentLength).Timeout(TimeSpan.FromSeconds(2)).Subscribe(
                res =>
                {
                    RequestCounter.Decrement();
                    Console.WriteLine("Pending requests count: {0}", RequestCounter.RequestCount);
                },
                ex =>
                {
                    RequestCounter.Decrement();
                    Console.WriteLine("Pending requests count: {0}", RequestCounter.RequestCount);
                },
                () =>
                {
                });
        }
    }

Web リクエスト カウンタがインクリメントされる前。リクエストのタイムアウト後、デクリメントされます。しかし、リクエスト カウンターは無限に大きくなります。何が欠けているのか説明してもらえますか?

4

1 に答える 1

1

監視可能なシーケンスへのサブスクリプションは非同期です。whileループは、可能な限り高速に実行され続けます。増分は、遅延した減分を上回ります。

.NET4.5でRxv2.0を使用している場合は、awaitを使用してループを実行し、前のリクエストが完了したときに次の反復に進むことができます。

while (true)
{
    var obs = ...;

    RequestCounter.Increment();

    await obs.Select(...).Timeout(...);

    RequestCounter.Decrement();

    Console.WriteLine(...);
}

より大きな問題は、リクエストが完了した場合にサブスクリプションを破棄しようとしている理由です。Rxは、シーケンスが終了状態(OnError、OnCompleted)に達すると、それ自体の後でクリーンアップするため、IDisposableオブジェクトを床にドロップするだけで、それらについて心配する必要はないようです。

于 2012-10-05T09:36:39.187 に答える