23

itemsLINQ 式の結果が提供されます。

var items = from item in ItemsSource.RetrieveItems()
            where ...

各アイテムの生成に無視できない時間がかかるとします。

次の 2 つの操作モードが可能です。

  1. を使用foreachすると、最終的に利用可能になるよりもはるかに早く、コレクションの最初のアイテムで作業を開始できます。ただし、後で同じコレクションを再度処理したい場合は、コピーして保存する必要があります。

    var storedItems = new List<Item>();
    foreach(var item in items)
    {
        Process(item);
        storedItems.Add(item);
    }
    
    // Later
    foreach(var item in storedItems)
    {
        ProcessMore(item);
    }
    

    作ったばかりだっforeach(... in items)たらItemsSource.RetrieveItems()、また呼ばれるからです。

  2. .ToList()前もって使用することもできますが、最初のアイテムの処理を開始する前に、最後のアイテムが取得されるまで待機する必要があります。

質問:IEnumerable通常の LINQ クエリの結果のように最初は反復する実装がありますが、2 回目foreachは保存された値を反復するように処理中に実体化しますか?

4

4 に答える 4

12

楽しい挑戦なので、私自身の解決策を提供する必要があります。実際、私のソリューションは現在バージョン 3 になっています。バージョン 2 は、Servy からのフィードバックに基づいて単純化したものです。その後、私のソリューションには大きな欠点があることに気付きました。キャッシュされた列挙型の最初の列挙が完了しなかった場合、キャッシュは行われません。多くの LINQ 拡張機能は、ジョブを完了するのに十分な数の列挙可能なものだけを列挙します。これをキャッシュで機能させるには、バージョン 3 に更新する必要がありましたFirstTake

問題は、同時アクセスを伴わない列挙可能な列挙の後続の列挙に関するものです。それにもかかわらず、ソリューションをスレッドセーフにすることにしました。多少の複雑さと多少のオーバーヘッドが追加されますが、ソリューションをすべてのシナリオで使用できるようにする必要があります。

public static class EnumerableExtensions {

  public static IEnumerable<T> Cached<T>(this IEnumerable<T> source) {
    if (source == null)
      throw new ArgumentNullException("source");
    return new CachedEnumerable<T>(source);
  }

}

class CachedEnumerable<T> : IEnumerable<T> {

  readonly Object gate = new Object();

  readonly IEnumerable<T> source;

  readonly List<T> cache = new List<T>();

  IEnumerator<T> enumerator;

  bool isCacheComplete;

  public CachedEnumerable(IEnumerable<T> source) {
    this.source = source;
  }

  public IEnumerator<T> GetEnumerator() {
    lock (this.gate) {
      if (this.isCacheComplete)
        return this.cache.GetEnumerator();
      if (this.enumerator == null)
        this.enumerator = source.GetEnumerator();
    }
    return GetCacheBuildingEnumerator();
  }

  public IEnumerator<T> GetCacheBuildingEnumerator() {
    var index = 0;
    T item;
    while (TryGetItem(index, out item)) {
      yield return item;
      index += 1;
    }
  }

  bool TryGetItem(Int32 index, out T item) {
    lock (this.gate) {
      if (!IsItemInCache(index)) {
        // The iteration may have completed while waiting for the lock.
        if (this.isCacheComplete) {
          item = default(T);
          return false;
        }
        if (!this.enumerator.MoveNext()) {
          item = default(T);
          this.isCacheComplete = true;
          this.enumerator.Dispose();
          return false;
        }
        this.cache.Add(this.enumerator.Current);
      }
      item = this.cache[index];
      return true;
    }
  }

  bool IsItemInCache(Int32 index) {
    return index < this.cache.Count;
  }

  IEnumerator IEnumerable.GetEnumerator() {
    return GetEnumerator();
  }

}

拡張子は次のように使用されます (sequenceIEnumerable<T>):

var cachedSequence = sequence.Cached();

// Pulling 2 items from the sequence.
foreach (var item in cachedSequence.Take(2))
  // ...

// Pulling 2 items from the cache and the rest from the source.
foreach (var item in cachedSequence)
  // ...

// Pulling all items from the cache.
foreach (var item in cachedSequence)
  // ...

列挙型の一部のみが列挙されている場合、わずかなリークがあります (例:cachedSequence.Take(2).ToList()によって使用されるToList列挙子は破棄されますが、基になるソース列挙子は破棄されません。これは、最初の 2 つの項目がキャッシュされ、ソース列挙子が有効に保たれるためです。その場合、ソース列挙子は、ガベージ コレクションの対象となる場合にのみクリーンアップされます (これは、おそらく大きなキャッシュと同じ時間になります)。

于 2012-09-14T16:11:25.587 に答える
8

Reactive Extentsionsライブラリを見てください-MemoizeAll()アクセスされたアイテムをIEnumerableにキャッシュし、将来のアクセスのために保存する拡張機能があります。

その他のRxメソッドの詳細については、BartDeSmetによるこのブログ投稿を参照してください。MemoizeAll

編集:これは実際には別のInteractiveExtensionsパッケージに含まれています-NuGetまたはMicrosoftDownloadから入手できます。

于 2012-09-14T15:20:12.723 に答える
3
public static IEnumerable<T> SingleEnumeration<T>(this IEnumerable<T> source)
{
    return new SingleEnumerator<T>(source);
}

private class SingleEnumerator<T> : IEnumerable<T>
{
    private CacheEntry<T> cacheEntry;
    public SingleEnumerator(IEnumerable<T> sequence)
    {
        cacheEntry = new CacheEntry<T>(sequence.GetEnumerator());
    }

    public IEnumerator<T> GetEnumerator()
    {
        if (cacheEntry.FullyPopulated)
        {
            return cacheEntry.CachedValues.GetEnumerator();
        }
        else
        {
            return iterateSequence<T>(cacheEntry).GetEnumerator();
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }
}

private static IEnumerable<T> iterateSequence<T>(CacheEntry<T> entry)
{
    using (var iterator = entry.CachedValues.GetEnumerator())
    {
        int i = 0;
        while (entry.ensureItemAt(i) && iterator.MoveNext())
        {
            yield return iterator.Current;
            i++;
        }
    }
}

private class CacheEntry<T>
{
    public bool FullyPopulated { get; private set; }
    public ConcurrentQueue<T> CachedValues { get; private set; }

    private static object key = new object();
    private IEnumerator<T> sequence;

    public CacheEntry(IEnumerator<T> sequence)
    {
        this.sequence = sequence;
        CachedValues = new ConcurrentQueue<T>();
    }

    /// <summary>
    /// Ensure that the cache has an item a the provided index.  If not, take an item from the 
    /// input sequence and move to the cache.
    /// 
    /// The method is thread safe.
    /// </summary>
    /// <returns>True if the cache already had enough items or 
    /// an item was moved to the cache, 
    /// false if there were no more items in the sequence.</returns>
    public bool ensureItemAt(int index)
    {
        //if the cache already has the items we don't need to lock to know we 
        //can get it
        if (index < CachedValues.Count)
            return true;
        //if we're done there's no race conditions hwere either
        if (FullyPopulated)
            return false;

        lock (key)
        {
            //re-check the early-exit conditions in case they changed while we were
            //waiting on the lock.

            //we already have the cached item
            if (index < CachedValues.Count)
                return true;
            //we don't have the cached item and there are no uncached items
            if (FullyPopulated)
                return false;

            //we actually need to get the next item from the sequence.
            if (sequence.MoveNext())
            {
                CachedValues.Enqueue(sequence.Current);
                return true;
            }
            else
            {
                FullyPopulated = true;
                return false;
            }
        }
    }
}

そのため、これはマルチスレッド アクセスをサポートするように (実質的に) 編集されています。複数のスレッドがアイテムを要求でき、アイテムごとにキャッシュされます。キャッシュされた値を返すためにシーケンス全体が繰り返されるのを待つ必要はありません。以下は、これを示すサンプル プログラムです。

private static IEnumerable<int> interestingIntGenertionMethod(int maxValue)
{
    for (int i = 0; i < maxValue; i++)
    {
        Thread.Sleep(1000);
        Console.WriteLine("actually generating value: {0}", i);
        yield return i;
    }
}

public static void Main(string[] args)
{
    IEnumerable<int> sequence = interestingIntGenertionMethod(10)
        .SingleEnumeration();

    int numThreads = 3;
    for (int i = 0; i < numThreads; i++)
    {
        int taskID = i;
        Task.Factory.StartNew(() =>
        {
            foreach (int value in sequence)
            {
                Console.WriteLine("Task: {0} Value:{1}",
                    taskID, value);
            }
        });
    }

    Console.WriteLine("Press any key to exit...");
    Console.ReadKey(true);
}

ここでの力を理解するには、実際に実行する必要があります。1 つのスレッドが次の実際の値を強制的に生成するとすぐに、残りのすべてのスレッドがその生成された値をすぐに出力できますが、そのスレッドが出力するキャッシュされていない値がない場合はすべて待機します。(明らかに、スレッド/スレッドプールのスケジューリングにより、1 つのタスクがその値を出力するのに必要以上に時間がかかる場合があります。)

于 2012-09-14T15:10:10.730 に答える
0

Martin Liversage と Servy によるCached/演算子のスレッドセーフな実装が既に投稿されており、 System.Interactiveパッケージのスレッドセーフな演算子も利用できます。スレッドセーフが要件ではなく、スレッド同期のコストを支払うことが望ましくない場合は、この質問で非同期実装を提供する回答があります。これらの実装はすべて、カスタム型に基づいているという共通点があります。私の課題は、単一の自己完結型拡張メソッド (文字列が添付されていない) で、同様の非同期演算子を作成することでした。これが私の実装です:SingleEnumerationMemoiseToCachedEnumerable

public static IEnumerable<T> MemoiseNotSynchronized<T>(this IEnumerable<T> source)
{
    // Argument validation omitted
    IEnumerator<T> enumerator = null;
    List<T> buffer = null;
    return Implementation();

    IEnumerable<T> Implementation()
    {
        if (buffer != null && enumerator == null)
        {
            // The source has been fully enumerated
            foreach (var item in buffer) yield return item;
            yield break;
        }

        enumerator ??= source.GetEnumerator();
        buffer ??= new();
        for (int i = 0; ; i = checked(i + 1))
        {
            if (i < buffer.Count)
            {
                yield return buffer[i];
            }
            else if (enumerator.MoveNext())
            {
                Debug.Assert(buffer.Count == i);
                var current = enumerator.Current;
                buffer.Add(current);
                yield return current;
            }
            else
            {
                enumerator.Dispose(); enumerator = null;
                yield break;
            }
        }
    }
}

使用例:

IEnumerable<Point> points = GetPointsFromDB().MemoiseNotSynchronized();
// Enumerate the 'points' any number of times, on a single thread.
// The data will be fetched from the DB only once.
// The connection with the DB will open when the 'points' is enumerated
// for the first time, partially or fully.
// The connection will stay open until the 'points' is enumerated fully
// for the first time.

FiddleMemoiseNotSynchronizedでオペレーターをテストします。

于 2021-08-16T00:49:39.940 に答える