9

並列タスク/待機で.NETイテレータを使用したいのですが。このようなもの:

IEnumerable<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source)
{
    Parallel.ForEach(
        source,
        s=>
        {
            // Ordering is NOT important
            // items can be yielded as soon as they are done                
            yield return ExecuteOrDownloadSomething(s);
        }
}

残念ながら、.NETはこれをネイティブに処理できません。@svickによるこれまでのベストアンサー-AsParallel()を使用します。

ボーナス:複数のパブリッシャーと単一のサブスクライバーを実装する単純な非同期/待機コードはありますか?加入者は譲歩し、パブは処理します。(コアライブラリのみ)

4

3 に答える 3

11

これはPLINQの仕事のようです。

return source.AsParallel().Select(s => ExecuteOrDownloadSomething(s));

これにより、限られた数のスレッドを使用してデリゲートが並行して実行され、完了するとすぐに各結果が返されます。

ExecuteOrDownloadSomething()メソッドがIOバウンドであり(たとえば、実際に何かをダウンロードする)、スレッドを無駄にしたくない場合は、-を使用するasyncawaitが理にかなっているかもしれませんが、より複雑になります。

を十分に活用したい場合は、同期しているためasync、を返さないでくださいIEnumerable(つまり、使用可能なアイテムがない場合はブロックされます)。必要なのはある種の非同期コレクションであり、そのためにTPL DataflowからISourceBlock(具体的には)を使用できます。TransformBlock

ISourceBlock<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source)
{
    var block = new TransformBlock<TSrc, TDest>(
        async s => await ExecuteOrDownloadSomethingAsync(s),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

    foreach (var item in source)
        block.Post(item);

    block.Complete();

    return block;
}

ソースが「遅い」場合(つまり、Foo()反復sourceが完了する前から結果の処理を開始したい場合)、を移動して別のforeachComplete()呼び出すことをお勧めしますTask。さらに良い解決策はsourceISourceBlock<TSrc>あまりにもすることです。

于 2013-02-11T10:11:34.097 に答える
1

したがって、本当にやりたいことは、タスクがいつ完了したかに基づいて一連のタスクを順序付けることであるように見えます。これはそれほど複雑ではありません。

public static IEnumerable<Task<T>> Order<T>(this IEnumerable<Task<T>> tasks)
{
    var input = tasks.ToList();

    var output = input.Select(task => new TaskCompletionSource<T>());
    var collection = new BlockingCollection<TaskCompletionSource<T>>();
    foreach (var tcs in output)
        collection.Add(tcs);

    foreach (var task in input)
    {
        task.ContinueWith(t =>
        {
            var tcs = collection.Take();
            switch (task.Status)
            {
                case TaskStatus.Canceled:
                    tcs.TrySetCanceled();
                    break;
                case TaskStatus.Faulted:
                    tcs.TrySetException(task.Exception.InnerExceptions);
                    break;
                case TaskStatus.RanToCompletion:
                    tcs.TrySetResult(task.Result);
                    break;
            }
        }
        , CancellationToken.None
        , TaskContinuationOptions.ExecuteSynchronously
        , TaskScheduler.Default);
    }

    return output.Select(tcs => tcs.Task);
}

したがって、ここでは、TaskCompletionSource入力タスクごとにを作成し、次に各タスクを実行して、から次の完了ソースを取得してBlockingCollectionその結果を設定する継続を設定します。完了した最初のタスクは返された最初のtcsを取得し、完了した2番目のタスクは返された2番目のtcsを取得します。

これで、コードは非常に単純になります。

var tasks = collection.Select(item => LongRunningOperationThatReturnsTask(item))
    .Order();
foreach(var task in tasks)
{
    var result = task.Result;//or you could `await` each result
    //....
}
于 2013-02-11T15:16:25.253 に答える
0

MSロボティクスチームによって作成された非同期ライブラリには、イテレータを使用して非同期コードを生成できる同時実行プリミティブがありました。

ライブラリ(CCR)は無料です(以前は無料でした)。素敵な紹介記事はここにあります:同時事

おそらく、このライブラリを.Netタスクライブラリと一緒に使用することができます。または、「自分でロールする」ように促します。

于 2013-02-11T08:52:01.833 に答える