9

整数1から5を返す2つのシーケンスがあるとしましょう。

最初は1、2、3を非常に速く返しますが、4と5はそれぞれ200msかかります。

public static IEnumerable<int> FastFirst()
{
    for (int i = 1; i < 6; i++)
    {
        if (i > 3) Thread.Sleep(200);
        yield return i;
    }
}

2番目は200msの遅延で1、2、および3を返しますが、4と5は高速で返されます。

public static IEnumerable<int> SlowFirst()
{
    for (int i = 1; i < 6; i++)
    {
        if (i < 4) Thread.Sleep(200);
        yield return i;
    }
}

これらの両方のシーケンスを結合すると、1から5までの数字が得られます。

FastFirst().Union(SlowFirst());

2つの方法のどちらがどの時点で遅延があるかを保証できないため、実行の順序で解決策を保証することはできません。したがって、私の例での(人為的な)遅延を最小限に抑えるために、ユニオンを並列化したいと思います。

実際のシナリオ:いくつかのエンティティを返すキャッシュと、すべてのエンティティを返すデータソースがあります。キャッシュされた結果ができるだけ速く生成されるように、リクエストをキャッシュとデータソースの両方に内部的に並列化するメソッドからイテレータを返すことができるようにしたいと思います。

注1:これはまだCPUサイクルを浪費していることに気づきました。シーケンスが遅い要素を反復処理するのをどのように防ぐことができるのか、できるだけ速くそれらを結合する方法を尋ねているのではありません。

更新1:複数のプロデューサーを受け入れ、ContinueWhenAllを使用してBlockingCollectionのCompleteAddingを1回だけ設定するように、achitakaさんのすばらしい応答を調整しました。コメントのフォーマットがないために失われるため、ここに配置しました。それ以上のフィードバックは素晴らしいでしょう!

public static IEnumerable<TResult> SelectAsync<TResult>(
    params IEnumerable<TResult>[] producer)
{
    var resultsQueue = new BlockingCollection<TResult>();

    var taskList = new HashSet<Task>();
    foreach (var result in producer)
    {
        taskList.Add(
            Task.Factory.StartNew(
                () =>
                    {
                        foreach (var product in result)
                        {
                            resultsQueue.Add(product);
                        }
                    }));
    }

    Task.Factory.ContinueWhenAll(taskList.ToArray(), x => resultsQueue.CompleteAdding());

    return resultsQueue.GetConsumingEnumerable();
}
4

2 に答える 2

3

これを見てください。最初のメソッドは、結果が出る順序ですべてを返すだけです。2番目は一意性をチェックします。それらを連鎖させると、あなたが望む結果が得られると思います。

public static class Class1
{
    public static IEnumerable<TResult> SelectAsync<TResult>(
        IEnumerable<TResult> producer1,
        IEnumerable<TResult> producer2,
        int capacity)
    {
        var resultsQueue = new BlockingCollection<TResult>(capacity);
        var producer1Done = false;
        var producer2Done = false;

        Task.Factory.StartNew(() =>
        {
            foreach (var product in producer1)
            {
                resultsQueue.Add(product);
            }
            producer1Done = true;
            if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); }
        });

        Task.Factory.StartNew(() =>
        {
            foreach (var product in producer2)
            {
                resultsQueue.Add(product);
            }
            producer2Done = true;
            if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); }
        });

        return resultsQueue.GetConsumingEnumerable();
    }


    public static IEnumerable<TResult> SelectAsyncUnique<TResult>(this IEnumerable<TResult> source)
    {
        HashSet<TResult> knownResults = new HashSet<TResult>();
        foreach (TResult result in source)
        {
            if (knownResults.Contains(result)) {continue;}
            knownResults.Add(result);
            yield return result;
        }
    }
}
于 2011-11-09T14:04:59.510 に答える
0

キャッシュは、データベースからのフェッチと比較してほぼ瞬時に行われるため、最初にキャッシュから読み取ってそれらのアイテムを返し、次にデータベースから読み取って、キャッシュで見つかったアイテムを除くアイテムを返すことができます。

これを並列化しようとすると、多くの複雑さが追加されますが、得られる利益はごくわずかです。

編集:

ソースの速度に予測可能な違いがない場合は、それらをスレッドで実行し、同期されたハッシュ セットを使用して、既に取得したアイテムを追跡し、新しいアイテムをキューに入れ、メイン スレッドに読み取らせることができます。キューから:

public static IEnumerable<TItem> GetParallel<TItem, TKey>(Func<TItem, TKey> getKey, params IEnumerable<TItem>[] sources) {
  HashSet<TKey> found = new HashSet<TKey>();
  List<TItem> queue = new List<TItem>();
  object sync = new object();
  int alive = 0;
  object aliveSync = new object();
  foreach (IEnumerable<TItem> source in sources) {
    lock (aliveSync) {
      alive++;
    }
    new Thread(s => {
      foreach (TItem item in s as IEnumerable<TItem>) {
        TKey key = getKey(item);
        lock (sync) {
          if (found.Add(key)) {
            queue.Add(item);
          }
        }
      }
      lock (aliveSync) {
        alive--;
      }
    }).Start(source);
  }
  while (true) {
    lock (sync) {
      if (queue.Count > 0) {
        foreach (TItem item in queue) {
          yield return item;
        }
        queue.Clear();
      }
    }
    lock (aliveSync) {
      if (alive == 0) break;
    }
    Thread.Sleep(100);
  }
}

テスト ストリーム:

public static IEnumerable<int> SlowRandomFeed(Random rnd) {
  int[] values = new int[100];
  for (int i = 0; i < 100; i++) {
    int pos = rnd.Next(i + 1);
    values[i] = i;
    int temp = values[pos];
    values[pos] = values[i];
    values[i] = temp;
  }
  foreach (int value in values) {
    yield return value;
    Thread.Sleep(rnd.Next(200));
  }
}

テスト:

Random rnd = new Random();
foreach (int item in GetParallel(n => n, SlowRandomFeed(rnd), SlowRandomFeed(rnd), SlowRandomFeed(rnd), SlowRandomFeed(rnd))) {
  Console.Write("{0:0000 }", item);
}
于 2011-11-09T13:41:17.720 に答える