9

.Net で async-await を使用しています。同時非同期呼び出しの数を制限するにはどうすればよいですか?

4

4 に答える 4

6

比較的簡単な方法の 1 つは、TPL データフローを (悪用) 使用することです。何かのようなもの:

public IEnumerable<TOutput> AsyncThrottle<TInput, TOutput>(
    IEnumerable<TInput> inputs, Func<TInput, Task<TOutput>> asyncFunction,
    int maxDegreeOfParallelism)
{
    var outputs = new ConcurrentQueue<TOutput>();

    var block = new ActionBlock<TInput>(
        async x => outputs.Enqueue(await asyncFunction(x)),
        new ExecutionDataflowBlockOptions
        { MaxDgreeOfParallelism = maxDegreeOfParallelism });

    foreach (var input in inputs)
        block.Send(input);

    block.Complete();
    block.Completion.Wait();

    return outputs.ToArray();
}
于 2012-09-15T17:21:09.610 に答える
0

私はこのテクニックの方が好きです。TaskCompletionSource着信タスクの出力タスクを作成するために使用しています。Taskこれが必要なのは、実行する前にaを返したいからです。以下のクラスは、各入力Func(of Task(of Object))をすぐに返されるaTaskCompletionSourceに関連付け、それらをキューに入れます。

キューの要素は実行中のタスクのリストにデキューされ、継続するとが設定されますTaskCompletionSource。ループ内でを呼び出すとWhenAny、部屋が空いたときに要素がキューから実行リストに移動します。WhenAny同時実行性の問題があるかもしれませんが、一度に1つしか存在しないことを確認するためのチェックもあります。

使用するには、次のような同期関数を置き換えるだけです。

Task.Run(AddressOf MySyncFunction) 'possibly many of these

これとともに:

Dim t1 As New Throttler(4)
t1.Run(AddressOf MySyncFunction) 'many of these, but only 4 will run at a time.

すでにタスクを返す関数の場合、それらをタスクを返す関数に変換して、thottlerがそれらを実行できるようにすることが重要です。交換:

NewTask = MyFunctionAsync()

と:

NewTask = t1.Run(Function () return MyFunctionAsync())

以下のクラスは、関数が同期/非同期、入力あり/なし、出力あり/なしのいずれであるかに応じて、Throttler.Run()のさまざまなシグネチャも実装します。TaskをTask(Of Output)に変換するのは特に注意が必要です。

Class Throttler
    Property MaxCount As Integer

    Sub New(Optional MaxCount As Integer = 1)
        Me.MaxCount = MaxCount
    End Sub

    Private Running As New List(Of Task)
    Private Waiting As New Concurrent.ConcurrentQueue(Of System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object)))
    Private AlreadyWaiting As Boolean

    Async Sub MakeWaiter()
        If AlreadyWaiting Then Exit Sub
        AlreadyWaiting = True
        Do While Waiting.Count > 0
            Dim CurrentWait As System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object)) = Nothing
            Do While Running.Count < MaxCount AndAlso Waiting.TryDequeue(CurrentWait)
                Dim NewFunc As Func(Of Task(Of Object)) = CurrentWait.Item1
                Dim NewTask As Task(Of Object) = NewFunc()
                Dim CurrentTcs As TaskCompletionSource(Of Object) = CurrentWait.Item2
                NewTask.ContinueWith(Sub(t2 As Task(Of Object))
                                         CurrentTcs.SetResult(t2.Result)
                                     End Sub)
                Running.Add(NewTask)
            Loop
            If Waiting.Count > 0 Then
                Dim Waiter As Task(Of Task)
                Waiter = Task.WhenAny(Running)
                Dim FinishedTask As Task = Await Waiter
                Await FinishedTask
                Running.Remove(FinishedTask)
            End If
        Loop
        AlreadyWaiting = False
    End Sub

    Function Run(f As Func(Of Task(Of Object))) As Task(Of Object)
        Dim NewTcs As New TaskCompletionSource(Of Object)
        Waiting.Enqueue(New System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object))(f, NewTcs))
        MakeWaiter()
        Return NewTcs.Task
    End Function

    Function Run(Of TInput)(f As Func(Of TInput, Task), input As TInput) As Task
        Dim NewF As Func(Of Task)
        NewF = Function() As Task
                   Return f(input)
               End Function
        Return Me.Run(NewF)
    End Function

    Function Run(Of TInput)(f As Func(Of TInput, Task(Of Object)), input As TInput) As Task(Of Object)
        Dim NewF As Func(Of Task(Of Object))
        NewF = Function() As Task(Of Object)
                   Return f(input)
               End Function
        Return CType(Me.Run(NewF), Task(Of Object))
    End Function

    Function Run(f As Func(Of Task)) As Task
        Dim NewF As Func(Of Task(Of Object))
        NewF = Function() As Task(Of Object)
                   Return f().ContinueWith(Function(t As task) As Object
                                               Return Nothing
                                           End Function)
               End Function
        Return CType(Me.Run(NewF), Task(Of Object))
    End Function

    Function Run(Of TInput)(f As Func(Of TInput, Object), input As TInput) As Task(Of Object)
        Dim NewF As Func(Of Task(Of Object))
        NewF = Function() As Task(Of Object)
                   Return Task.Run(Function() As Object
                                       Return f(input)
                                   End Function)
               End Function
        Return CType(Me.Run(NewF), Task(Of Object))
    End Function

    Function Run(Of TInput)(f As Action(Of TInput), input As TInput) As Task
        Dim NewF As Func(Of Task)
        NewF = Function() As Task
                   Return Task.Run(Sub()
                                       f(input)
                                   End Sub)
               End Function
        Return Me.Run(NewF)
    End Function

    Function Run(f As Func(Of Object)) As Task(Of Object)
        Dim NewF As Func(Of Task(Of Object))
        NewF = Function() As Task(Of Object)
                   Return Task.Run(Function()
                                       Return f()
                                   End Function)
               End Function
        Return CType(Me.Run(NewF), Task(Of Object))
    End Function

    Function Run(f As Action) As Task
        Dim NewF As Func(Of Task)
        NewF = Function() As Task
                   Return Task.Run(Sub()
                                       f()
                                   End Sub)
               End Function
        Return Me.Run(NewF)
    End Function
End Class
于 2012-09-17T22:49:09.553 に答える
0

注:これはレガシーのためにここに残します。WhenAny同時に待機しているタスクが多すぎるため、この方法は使用しないでください。そしてスタックは深くなります。

Stephen Toub によるこのコードに基づく:

const int CONCURRENCY_LEVEL = 15;
Uri [] urls = …;
int nextIndex = 0;
var imageTasks = new List<Task<Bitmap>>();
while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)
{
    imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
    nextIndex++;
}

while(imageTasks.Count > 0)
{
    try
    {
        Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
        imageTasks.Remove(imageTask);

        Bitmap image = await imageTask;
        panel.AddImage(image);
    }
    catch(Exception exc) { Log(exc); }

    if (nextIndex < urls.Length)
    {
        imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
        nextIndex++;
    }
}

私はこれを書きました:

Private ThrottleGroups As New Dictionary(Of Object, List(Of Task))
Public Async Function ThrottleAsync(Of TResult)(ByVal f As Func(Of Task(Of TResult)), GroupId As Object, MaxCount As Integer) As Task(Of TResult)
    If Not ThrottleGroups.ContainsKey(GroupId) Then
        ThrottleGroups.Add(GroupId, New List(Of Task))
    End If
    If ThrottleGroups(GroupId).Count < MaxCount Then
        Dim NewTask As Task(Of TResult) = f()
        ThrottleGroups(GroupId).Add(NewTask)
        Return Await NewTask
    Else
        Dim FinishedTask As Task = Await Task.WhenAny(ThrottleGroups(GroupId))
        ThrottleGroups(GroupId).Remove(FinishedTask)
        Return Await ThrottleAsync(f, GroupId, MaxCount)
    End If
End Function

使用するには、次のように置き換えます。

ExampleTaskAsync(param1, param2)

と:

Dim f As Func(Of Task(Of Integer))
f = Function()
        Return ExampleAsync(param1, param2)
    End Function
Const CONCURRENT_TASKS As Integer = 4
Return ThrottleAsync(f, "ExampleAsync", CONCURRENT_TASKS)

タスクへの呼び出しを関数でラップする必要があることに注意してくださいf。そうしないと、既にタスクを開始してしまうからです。ThrottleAsync の 2 番目のパラメーターは、「グループ」を識別する任意のオブジェクトです。ひもを使いました。同じ「グループ」内のすべての非同期タスクは、CONCURRENT_TASKSタスク (この場合は 4) に制限されます。

一度に 4 つのスレッドのみを実行する方法を示すサンプル コードを次に示します。 All Ready!サブルーチンは非同期であるため、すぐに表示されます。また、スレッドが順不同で開始または終了した場合でも、「出力」行は入力と同じ順序になります。

Dim results As New List(Of Task(Of Integer))
    For i As Integer = 0 To 20
        Dim j As Integer = i
        Dim f As Func(Of Task(Of Integer))
        f = Function() As Task(Of Integer)
                Return Task.Run(Function() As Integer
                                    Debug.WriteLine(DateTime.Now & "Starting " & j)
                                    System.Threading.Thread.Sleep(5000)
                                    Debug.WriteLine(DateTime.Now & "Ending " & j)
                                    Return j
                                End Function)
            End Function
        Const CONCURRENT_UPLOADS As Integer = 4
        results.Add(ThrottleAsync(f, "PutOjbectAsync", CONCURRENT_UPLOADS))
    Next
    Debug.WriteLine("all ready!")
    For Each x As Task(Of Integer) In results
        Debug.WriteLine(DateTime.Now & "Output: " & Await x)
    Next
于 2012-09-15T17:04:30.260 に答える
0

コードによっては、最も簡単な方法は Parallel.For(Each) を使用し、並列オプションで最大並列処理を指定することです。

于 2012-09-15T18:52:44.697 に答える