27

次のような非同期述語メソッドがあります。

private async Task<bool> MeetsCriteria(Uri address)
{
    //Do something involving awaiting an HTTP request.
}

のコレクションがあるとしますUri:

var addresses = new[]
{
    new Uri("http://www.google.com/"),
    new Uri("http://www.stackoverflow.com/") //etc.
};

addressesを使ってフィルタリングしたいMeetsCriteria。これを非同期で行いたいです。述語への複数の呼び出しを非同期で実行したいので、すべての呼び出しが完了するのを待って、フィルター処理された結果セットを生成したいと考えています。残念ながら、LINQ は非同期述語をサポートしていないように見えるため、次のようなものは機能しません

var filteredAddresses = addresses.Where(MeetsCriteria);

これを行う同様に便利な方法はありますか?

4

5 に答える 5

21

このようなものがフレームワークにない理由の1つは、考えられるバリエーションがたくさんあり、特定の状況ではそれぞれの選択肢が正しいものになることだと思います。

  • 述語は並列で実行する必要がありますか、それとも直列で実行する必要がありますか?
    • それらが並列で実行される場合、それらはすべて一度に実行する必要がありますか、それとも並列処理の程度を制限する必要がありますか?
    • それらが並行して実行される場合、結果は元のコレクションと同じ順序、完了の順序、または未定義の順序である必要がありますか?
      • 完了順に返される必要がある場合、完了時に結果を(非同期で)取得する方法はありますか?(これには、リターンタイプをから別のTask<IEnumerable<T>>ものに変更する必要があります。)

述語を並行して実行したいとおっしゃいました。その場合、最も簡単な選択は、それらをすべて一度に実行し、完了順に返すことです。

static async Task<IEnumerable<T>> Where<T>(
    this IEnumerable<T> source, Func<T, Task<bool>> predicate)
{
    var results = new ConcurrentQueue<T>();
    var tasks = source.Select(
        async x =>
        {
            if (await predicate(x))
                results.Enqueue(x);
        });
    await Task.WhenAll(tasks);
    return results;
}

次に、次のように使用できます。

var filteredAddresses = await addresses.Where(MeetsCriteria);
于 2013-02-15T12:56:52.597 に答える
9

最初のアプローチ: すべてのリクエストを前もって次々に発行し、すべてのリクエストが戻ってくるまで待ってから、結果をフィルタリングします。(svick のコードもこれを行いましたが、ここでは中間の ConcurrentQueue なしで行っています)。

// First approach: massive fan-out
var tasks = addresses.Select(async a => new { A = a, C = await MeetsCriteriaAsync(a) });
var addressesAndCriteria = await Task.WhenAll(tasks);
var filteredAddresses = addressAndCriteria.Where(ac => ac.C).Select(ac => ac.A);

2 番目のアプローチ: 要求を次々に実行します。これには時間がかかりますが、膨大なリクエストの猛攻撃で Web サービスを攻撃しないようにします (MeetsCriteriaAsync が Web サービスに送信されると仮定します...)。

// Second approach: one by one
var filteredAddresses = new List<Uri>();
foreach (var a in filteredAddresses)
{
  if (await MeetsCriteriaAsync(a)) filteredAddresses.Add(a);
}

3 番目のアプローチ: 2 番目と同様ですが、架空の C#8 機能「非同期ストリーム」を使用します。C#8 はまだリリースされておらず、非同期ストリームはまだ設計されていませんが、夢を見ることはできます! IAsyncEnumerable 型は RX に既に存在しており、うまくいけば、さらにコンビネータが追加されるでしょう。IAsyncEnumerable の良いところは、すべてが最初にフィルター処理されるのを待つのではなく、最初のいくつかのfilteredAddresses が来るとすぐに消費を開始できることです。

// Third approach: ???
IEnumerable<Uri> addresses = {...};
IAsyncEnumerable<Uri> filteredAddresses = addresses.WhereAsync(MeetsCriteriaAsync);

4 番目のアプローチ: Web サービスにすべてのリクエストを一度にぶつけたくないかもしれませんが、一度に複数のリクエストを発行しても問題ありません。たぶん、私たちは実験をして、「一度に3つ」が幸せな媒体であることを発見しました. 注: このコードは、UI プログラミングや ASP.NET などのシングルスレッド実行コンテキストを想定しています。マルチスレッド実行コンテキストで実行されている場合は、代わりに ConcurrentQueue と ConcurrentList が必要です。

// Fourth approach: throttle to three-at-a-time requests
var addresses = new Queue<Uri>(...);
var filteredAddresses = new List<Uri>();
var worker1 = FilterAsync(addresses, filteredAddresses);
var worker2 = FilterAsync(addresses, filteredAddresses);
var worker3 = FilterAsync(addresses, filteredAddresses);
await Task.WhenAll(worker1, worker2, worker3);

async Task FilterAsync(Queue<Uri> q, List<Uri> r)
{
  while (q.Count > 0)
  {
    var item = q.Dequeue();
    if (await MeetsCriteriaAsync(item)) r.Add(item);
  }
}

TPL データフロー ライブラリを使用して 4 番目のアプローチを行う方法もあります。

于 2016-11-16T17:36:46.070 に答える