7

1つのアプリケーションサーバーに対して並列リクエストを行うためにHttpWebRequestを使用する場合、スロットルメカニズム(1秒あたりのリクエスト数)を実装する必要があります。私のC#アプリは、リモートサーバーに対して1秒あたり80を超えるリクエストを発行する必要があります。この制限は、リモートサービス管理者によって、ハード制限としてではなく、私のプラットフォームとそのプラットフォームの間の「SLA」として課されます。

HttpWebRequestを使用する場合、1秒あたりのリクエスト数を制御するにはどうすればよいですか?

4

3 に答える 3

3

私は同じ問題を抱えていて、すぐに解決できる解決策を見つけることができなかったので、解決策を作成しました。アイデアは、 を使用しBlockingCollection<T>て処理が必要なアイテムを追加し、Reactive Extensions を使用してレート制限されたプロセッサでサブスクライブすることです。

Throttle クラスは、このレート リミッタの名前が変更されたバージョンです

public static class BlockingCollectionExtensions
{
    // TODO: devise a way to avoid problems if collection gets too big (produced faster than consumed)
    public static IObservable<T> AsRateLimitedObservable<T>(this BlockingCollection<T> sequence, int items, TimeSpan timePeriod, CancellationToken producerToken)
    {
        Subject<T> subject = new Subject<T>();

        // this is a dummyToken just so we can recreate the TokenSource
        // which we will pass the proxy class so it can cancel the task
        // on disposal
        CancellationToken dummyToken = new CancellationToken();
        CancellationTokenSource tokenSource = CancellationTokenSource.CreateLinkedTokenSource(producerToken, dummyToken);

        var consumingTask = new Task(() =>
        {
            using (var throttle = new Throttle(items, timePeriod))
            {
                while (!sequence.IsCompleted)
                {
                    try
                    {
                        T item = sequence.Take(producerToken);
                        throttle.WaitToProceed();
                        try
                        {
                            subject.OnNext(item);
                        }
                        catch (Exception ex)
                        {
                            subject.OnError(ex);
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        break;
                    }
                }
                subject.OnCompleted();
            }
        }, TaskCreationOptions.LongRunning);

        return new TaskAwareObservable<T>(subject, consumingTask, tokenSource);
    }

    private class TaskAwareObservable<T> : IObservable<T>, IDisposable
    {
        private readonly Task task;
        private readonly Subject<T> subject;
        private readonly CancellationTokenSource taskCancellationTokenSource;

        public TaskAwareObservable(Subject<T> subject, Task task, CancellationTokenSource tokenSource)
        {
            this.task = task;
            this.subject = subject;
            this.taskCancellationTokenSource = tokenSource;
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            var disposable = subject.Subscribe(observer);
            if (task.Status == TaskStatus.Created)
                task.Start();
            return disposable;
        }

        public void Dispose()
        {
            // cancel consumption and wait task to finish
            taskCancellationTokenSource.Cancel();
            task.Wait();

            // dispose tokenSource and task
            taskCancellationTokenSource.Dispose();
            task.Dispose();

            // dispose subject
            subject.Dispose();
        }
    }
}

単体テスト:

class BlockCollectionExtensionsTest
{
    [Fact]
    public void AsRateLimitedObservable()
    {
        const int maxItems = 1; // fix this to 1 to ease testing
        TimeSpan during = TimeSpan.FromSeconds(1);

        // populate collection
        int[] items = new[] { 1, 2, 3, 4 };
        BlockingCollection<int> collection = new BlockingCollection<int>();
        foreach (var i in items) collection.Add(i);
        collection.CompleteAdding();

        IObservable<int> observable = collection.AsRateLimitedObservable(maxItems, during, CancellationToken.None);
        BlockingCollection<int> processedItems = new BlockingCollection<int>();
        ManualResetEvent completed = new ManualResetEvent(false);
        DateTime last = DateTime.UtcNow;
        observable
            // this is so we'll receive exceptions
            .ObserveOn(new SynchronizationContext()) 
            .Subscribe(item =>
                {
                    if (item == 1)
                        last = DateTime.UtcNow;
                    else
                    {
                        TimeSpan diff = (DateTime.UtcNow - last);
                        last = DateTime.UtcNow;

                        Assert.InRange(diff.TotalMilliseconds,
                            during.TotalMilliseconds - 30,
                            during.TotalMilliseconds + 30);
                    }
                    processedItems.Add(item);
                },
                () => completed.Set()
            );
        completed.WaitOne();
        Assert.Equal(items, processedItems, new CollectionEqualityComparer<int>());
    }
}
于 2012-05-02T10:28:27.937 に答える
0

Throttle() および Sample() 拡張メソッド (On Observable) を使用すると、イベントの高速シーケンスを「低速」シーケンスに調整できます。

Sample(Timespan)これは、最大レートを保証するを含むブログ投稿です。

于 2012-04-23T07:11:44.440 に答える
-1

私の最初の投稿では、クライアント動作拡張機能を介して WCF にスロットリング メカニズムを追加する方法について説明しましたが、質問を読み違えていることが指摘されました (doh!)。

全体的なアプローチは、レート制限に違反しているかどうかを判断するクラスをチェックすることです。レート違反をチェックする方法については、すでに多くの議論が行われています。

M リクエストのメソッド呼び出しを N 秒でスロットリングする

レート制限に違反している場合は、一定の間隔でスリープしてから再度確認してください。そうでない場合は、先に進んで HttpWebRequest 呼び出しを行います。

于 2012-04-21T08:16:38.320 に答える