138
// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
urls.AsParallel().ForAll(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
});

ここに問題があります.1000以上の同時Webリクエストを開始します. これらの非同期 http 要求の同時実行量を制限する簡単な方法はありますか? 常に 20 を超える Web ページがダウンロードされないようにします。最も効率的な方法でそれを行う方法は?

4

11 に答える 11

205

.NET 4.5 Betaを使用して、最新バージョンのasyncfor.NETでこれを確実に行うことができます。'usr'からの以前の投稿は、Stephen Toubによって書かれた良い記事を指していますが、あまり発表されていないニュースは、非同期セマフォが実際に.NET4.5のベータリリースに組み込まれたことです。

私たちの最愛SemaphoreSlimのクラス(元のクラスよりもパフォーマンスが高いので使用する必要があります)を見ると、タイムアウト間隔、キャンセルトークン、通常のスケジューリングの友達など、予想されるすべての引数を含む一連のオーバーロードがSemaphore自慢です。 WaitAsync(...))。

Stephenは、ベータ版でリリースされた新しい.NET 4.5の機能について、より最近のブログ投稿も書いています。「. NET4.5ベータ版の並列処理の新機能」を参照してください。

最後に、非同期メソッドのスロットリングにSemaphoreSlimを使用する方法に関するサンプルコードを次に示します。

public async Task MyOuterMethod()
{
    // let's say there is a list of 1000+ URLs
    var urls = { "http://google.com", "http://yahoo.com", ... };

    // now let's send HTTP requests to each of these URLs in parallel
    var allTasks = new List<Task>();
    var throttler = new SemaphoreSlim(initialCount: 20);
    foreach (var url in urls)
    {
        // do an async wait until we can schedule again
        await throttler.WaitAsync();

        // using Task.Run(...) to run the lambda in its own parallel
        // flow on the threadpool
        allTasks.Add(
            Task.Run(async () =>
            {
                try
                {
                    var client = new HttpClient();
                    var html = await client.GetStringAsync(url);
                }
                finally
                {
                    throttler.Release();
                }
            }));
    }

    // won't get here until all urls have been put into tasks
    await Task.WhenAll(allTasks);

    // won't get here until all tasks have completed in some way
    // (either success or exception)
}

最後になりますが、おそらく言及する価値があるのは、TPLベースのスケジューリングを使用するソリューションです。まだ開始されていないTPLでデリゲートにバインドされたタスクを作成し、カスタムタスクスケジューラで同時実行を制限できるようにすることができます。実際、ここにそのためのMSDNサンプルがあります:

TaskSchedulerも参照してください。

于 2012-05-30T06:01:57.147 に答える
23

IEnumerable (つまり、URL の文字列) があり、これらのそれぞれで I/O バウンド操作を同時に実行したい (つまり、非同期 http 要求を作成する) 場合、およびオプションで同時実行の最大数も設定したい場合リアルタイムでの I/O リクエスト。これを行う方法は次のとおりです。この方法では、スレッド プールなどを使用せず、semaphoreslim を使用して、1 つの要求が完了し、セマフォを離れて次の要求が入るスライディング ウィンドウ パターンと同様に、最大同時 I/O 要求を制御します。

利用方法:

await ForEachAsync(urlStrings, YourAsyncFunc, optionalMaxDegreeOfConcurrency);
public static Task ForEachAsync<TIn>(
        IEnumerable<TIn> inputEnumerable,
        Func<TIn, Task> asyncProcessor,
        int? maxDegreeOfParallelism = null)
    {
        int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
        SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);

        IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
        {
            await throttler.WaitAsync().ConfigureAwait(false);
            try
            {
                await asyncProcessor(input).ConfigureAwait(false);
            }
            finally
            {
                throttler.Release();
            }
        });

        return Task.WhenAll(tasks);
    }
于 2016-06-01T12:52:56.480 に答える
6

残念ながら、.NET Framework には、並列非同期タスクを調整するための最も重要なコンビネーターがありません。そのようなものは組み込まれていません。

最も著名な Stephen Toub によって作成されたAsyncSemaphoreクラスを見てください。必要なものはセマフォと呼ばれ、非同期バージョンが必要です。

于 2012-05-29T21:49:04.917 に答える
0

1000 個のタスクがすぐにキューに入れられる可能性がありますが、Parallel Tasks ライブラリは、マシンの CPU コアの量に等しい同時タスクしか処理できません。つまり、4 コアのマシンを使用している場合、(MaxDegreeOfParallelism を下げない限り) 特定の時間に実行されるタスクは 4 つだけです。

于 2012-05-29T21:32:14.937 に答える
-1

これはグローバル変数を変更するため、良い方法ではありません。また、非同期の一般的な解決策でもありません。しかし、HttpClient のすべてのインスタンスについては簡単です。あなたは簡単に試すことができます:

System.Net.ServicePointManager.DefaultConnectionLimit = 20;
于 2019-03-29T08:36:08.230 に答える
-2

CPUバウンド操作を高速化するには、並列計算を使用する必要があります。ここでは、I/Oバウンド操作について説明します。マルチコアCPUでビジー状態のシングルコアを圧倒している場合を除いて、実装は純粋に非同期である必要があります。

編集 ここで「非同期セマフォ」を使用するというusrの提案が気に入っています。

于 2012-05-29T21:34:29.780 に答える
-2

基本的に、ヒットする URL ごとにアクションまたはタスクを作成し、それらをリストに入れ、そのリストを処理して、並行して処理できる数を制限します。

私のブログ投稿では、タスクとアクションの両方でこれを行う方法を示し、ダウンロードして実行して両方の動作を確認できるサンプル プロジェクトを提供しています。

アクションあり

アクションを使用する場合は、組み込みの .Net Parallel.Invoke 関数を使用できます。ここでは、最大 20 個のスレッドを並行して実行するように制限しています。

var listOfActions = new List<Action>();
foreach (var url in urls)
{
    var localUrl = url;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(() => CallUrl(localUrl)));
}

var options = new ParallelOptions {MaxDegreeOfParallelism = 20};
Parallel.Invoke(options, listOfActions.ToArray());

タスクあり

Tasks には組み込み関数はありません。ただし、ブログで提供しているものを使用できます。

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
    {
        await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
    }

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel.
    /// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para>
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
    {
        // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly.
        var tasks = tasksToRun.ToList();

        using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
        {
            var postTaskTasks = new List<Task>();

            // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
            tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));

            // Start running each task.
            foreach (var task in tasks)
            {
                // Increment the number of tasks currently running and wait if too many are running.
                await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken);

                cancellationToken.ThrowIfCancellationRequested();
                task.Start();
            }

            // Wait for all of the provided tasks to complete.
            // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
            await Task.WhenAll(postTaskTasks.ToArray());
        }
    }

次に、タスクのリストを作成し、関数を呼び出してそれらを実行します。たとえば、一度に最大 20 個まで同時に実行すると、次のようになります。

var listOfTasks = new List<Task>();
foreach (var url in urls)
{
    var localUrl = url;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(async () => await CallUrl(localUrl)));
}
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 20);
于 2016-04-29T08:34:32.407 に答える
-3

MaxDegreeOfParallelismで指定できるオプションである を使用しParallel.ForEach()ます。

var options = new ParallelOptions { MaxDegreeOfParallelism = 20 };

Parallel.ForEach(urls, options,
    url =>
        {
            var client = new HttpClient();
            var html = client.GetStringAsync(url);
            // do stuff with html
        });
于 2012-05-29T21:43:57.120 に答える