.Net で async-await を使用しています。同時非同期呼び出しの数を制限するにはどうすればよいですか?
4 に答える
比較的簡単な方法の 1 つは、TPL データフローを (悪用) 使用することです。何かのようなもの:
public IEnumerable<TOutput> AsyncThrottle<TInput, TOutput>(
IEnumerable<TInput> inputs, Func<TInput, Task<TOutput>> asyncFunction,
int maxDegreeOfParallelism)
{
var outputs = new ConcurrentQueue<TOutput>();
var block = new ActionBlock<TInput>(
async x => outputs.Enqueue(await asyncFunction(x)),
new ExecutionDataflowBlockOptions
{ MaxDgreeOfParallelism = maxDegreeOfParallelism });
foreach (var input in inputs)
block.Send(input);
block.Complete();
block.Completion.Wait();
return outputs.ToArray();
}
私はこのテクニックの方が好きです。TaskCompletionSource
着信タスクの出力タスクを作成するために使用しています。Task
これが必要なのは、実行する前にaを返したいからです。以下のクラスは、各入力Func(of Task(of Object))
をすぐに返されるaTaskCompletionSource
に関連付け、それらをキューに入れます。
キューの要素は実行中のタスクのリストにデキューされ、継続するとが設定されますTaskCompletionSource
。ループ内でを呼び出すとWhenAny
、部屋が空いたときに要素がキューから実行リストに移動します。WhenAny
同時実行性の問題があるかもしれませんが、一度に1つしか存在しないことを確認するためのチェックもあります。
使用するには、次のような同期関数を置き換えるだけです。
Task.Run(AddressOf MySyncFunction) 'possibly many of these
これとともに:
Dim t1 As New Throttler(4)
t1.Run(AddressOf MySyncFunction) 'many of these, but only 4 will run at a time.
すでにタスクを返す関数の場合、それらをタスクを返す関数に変換して、thottlerがそれらを実行できるようにすることが重要です。交換:
NewTask = MyFunctionAsync()
と:
NewTask = t1.Run(Function () return MyFunctionAsync())
以下のクラスは、関数が同期/非同期、入力あり/なし、出力あり/なしのいずれであるかに応じて、Throttler.Run()のさまざまなシグネチャも実装します。TaskをTask(Of Output)に変換するのは特に注意が必要です。
Class Throttler
Property MaxCount As Integer
Sub New(Optional MaxCount As Integer = 1)
Me.MaxCount = MaxCount
End Sub
Private Running As New List(Of Task)
Private Waiting As New Concurrent.ConcurrentQueue(Of System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object)))
Private AlreadyWaiting As Boolean
Async Sub MakeWaiter()
If AlreadyWaiting Then Exit Sub
AlreadyWaiting = True
Do While Waiting.Count > 0
Dim CurrentWait As System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object)) = Nothing
Do While Running.Count < MaxCount AndAlso Waiting.TryDequeue(CurrentWait)
Dim NewFunc As Func(Of Task(Of Object)) = CurrentWait.Item1
Dim NewTask As Task(Of Object) = NewFunc()
Dim CurrentTcs As TaskCompletionSource(Of Object) = CurrentWait.Item2
NewTask.ContinueWith(Sub(t2 As Task(Of Object))
CurrentTcs.SetResult(t2.Result)
End Sub)
Running.Add(NewTask)
Loop
If Waiting.Count > 0 Then
Dim Waiter As Task(Of Task)
Waiter = Task.WhenAny(Running)
Dim FinishedTask As Task = Await Waiter
Await FinishedTask
Running.Remove(FinishedTask)
End If
Loop
AlreadyWaiting = False
End Sub
Function Run(f As Func(Of Task(Of Object))) As Task(Of Object)
Dim NewTcs As New TaskCompletionSource(Of Object)
Waiting.Enqueue(New System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object))(f, NewTcs))
MakeWaiter()
Return NewTcs.Task
End Function
Function Run(Of TInput)(f As Func(Of TInput, Task), input As TInput) As Task
Dim NewF As Func(Of Task)
NewF = Function() As Task
Return f(input)
End Function
Return Me.Run(NewF)
End Function
Function Run(Of TInput)(f As Func(Of TInput, Task(Of Object)), input As TInput) As Task(Of Object)
Dim NewF As Func(Of Task(Of Object))
NewF = Function() As Task(Of Object)
Return f(input)
End Function
Return CType(Me.Run(NewF), Task(Of Object))
End Function
Function Run(f As Func(Of Task)) As Task
Dim NewF As Func(Of Task(Of Object))
NewF = Function() As Task(Of Object)
Return f().ContinueWith(Function(t As task) As Object
Return Nothing
End Function)
End Function
Return CType(Me.Run(NewF), Task(Of Object))
End Function
Function Run(Of TInput)(f As Func(Of TInput, Object), input As TInput) As Task(Of Object)
Dim NewF As Func(Of Task(Of Object))
NewF = Function() As Task(Of Object)
Return Task.Run(Function() As Object
Return f(input)
End Function)
End Function
Return CType(Me.Run(NewF), Task(Of Object))
End Function
Function Run(Of TInput)(f As Action(Of TInput), input As TInput) As Task
Dim NewF As Func(Of Task)
NewF = Function() As Task
Return Task.Run(Sub()
f(input)
End Sub)
End Function
Return Me.Run(NewF)
End Function
Function Run(f As Func(Of Object)) As Task(Of Object)
Dim NewF As Func(Of Task(Of Object))
NewF = Function() As Task(Of Object)
Return Task.Run(Function()
Return f()
End Function)
End Function
Return CType(Me.Run(NewF), Task(Of Object))
End Function
Function Run(f As Action) As Task
Dim NewF As Func(Of Task)
NewF = Function() As Task
Return Task.Run(Sub()
f()
End Sub)
End Function
Return Me.Run(NewF)
End Function
End Class
注:これはレガシーのためにここに残します。WhenAny
同時に待機しているタスクが多すぎるため、この方法は使用しないでください。そしてスタックは深くなります。
Stephen Toub によるこのコードに基づく:
const int CONCURRENCY_LEVEL = 15;
Uri [] urls = …;
int nextIndex = 0;
var imageTasks = new List<Task<Bitmap>>();
while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)
{
imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
nextIndex++;
}
while(imageTasks.Count > 0)
{
try
{
Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
imageTasks.Remove(imageTask);
Bitmap image = await imageTask;
panel.AddImage(image);
}
catch(Exception exc) { Log(exc); }
if (nextIndex < urls.Length)
{
imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
nextIndex++;
}
}
私はこれを書きました:
Private ThrottleGroups As New Dictionary(Of Object, List(Of Task))
Public Async Function ThrottleAsync(Of TResult)(ByVal f As Func(Of Task(Of TResult)), GroupId As Object, MaxCount As Integer) As Task(Of TResult)
If Not ThrottleGroups.ContainsKey(GroupId) Then
ThrottleGroups.Add(GroupId, New List(Of Task))
End If
If ThrottleGroups(GroupId).Count < MaxCount Then
Dim NewTask As Task(Of TResult) = f()
ThrottleGroups(GroupId).Add(NewTask)
Return Await NewTask
Else
Dim FinishedTask As Task = Await Task.WhenAny(ThrottleGroups(GroupId))
ThrottleGroups(GroupId).Remove(FinishedTask)
Return Await ThrottleAsync(f, GroupId, MaxCount)
End If
End Function
使用するには、次のように置き換えます。
ExampleTaskAsync(param1, param2)
と:
Dim f As Func(Of Task(Of Integer))
f = Function()
Return ExampleAsync(param1, param2)
End Function
Const CONCURRENT_TASKS As Integer = 4
Return ThrottleAsync(f, "ExampleAsync", CONCURRENT_TASKS)
タスクへの呼び出しを関数でラップする必要があることに注意してくださいf
。そうしないと、既にタスクを開始してしまうからです。ThrottleAsync の 2 番目のパラメーターは、「グループ」を識別する任意のオブジェクトです。ひもを使いました。同じ「グループ」内のすべての非同期タスクは、CONCURRENT_TASKS
タスク (この場合は 4) に制限されます。
一度に 4 つのスレッドのみを実行する方法を示すサンプル コードを次に示します。 All Ready!
サブルーチンは非同期であるため、すぐに表示されます。また、スレッドが順不同で開始または終了した場合でも、「出力」行は入力と同じ順序になります。
Dim results As New List(Of Task(Of Integer))
For i As Integer = 0 To 20
Dim j As Integer = i
Dim f As Func(Of Task(Of Integer))
f = Function() As Task(Of Integer)
Return Task.Run(Function() As Integer
Debug.WriteLine(DateTime.Now & "Starting " & j)
System.Threading.Thread.Sleep(5000)
Debug.WriteLine(DateTime.Now & "Ending " & j)
Return j
End Function)
End Function
Const CONCURRENT_UPLOADS As Integer = 4
results.Add(ThrottleAsync(f, "PutOjbectAsync", CONCURRENT_UPLOADS))
Next
Debug.WriteLine("all ready!")
For Each x As Task(Of Integer) In results
Debug.WriteLine(DateTime.Now & "Output: " & Await x)
Next
コードによっては、最も簡単な方法は Parallel.For(Each) を使用し、並列オプションで最大並列処理を指定することです。