22

最終編集:

私はTimothyの回答を選択しましたが、C# のyieldステートメントを活用したよりキュートな実装が必要な場合は、Eamonの回答を確認してください: https://stackoverflow.com/a/19825659/145757


デフォルトでは、 LINQクエリは遅延ストリーミングされます。

ToArray/完全なバッファリングToListを提供しますが、最初は熱心で、次に無限シーケンスで完了するまでにかなりの時間がかかる場合があります。

両方の動作を組み合わせて使用​​する方法はありますか:次のクエリが既にクエリされた要素の生成をトリガーしないように、値が生成されるとその場でストリーミングバッファリングを行います。

基本的な使用例は次のとおりです。

static IEnumerable<int> Numbers
{
    get
    {
        int i = -1;

        while (true)
        {
            Console.WriteLine("Generating {0}.", i + 1);
            yield return ++i;
        }
    }
}

static void Main(string[] args)
{
    IEnumerable<int> evenNumbers = Numbers.Where(i => i % 2 == 0);

    foreach (int n in evenNumbers)
    {
        Console.WriteLine("Reading {0}.", n);
        if (n == 10) break;
    }

    Console.WriteLine("==========");

    foreach (int n in evenNumbers)
    {
        Console.WriteLine("Reading {0}.", n);
        if (n == 10) break;
    }
}

出力は次のとおりです。

Generating 0.
Reading 0.
Generating 1.
Generating 2.
Reading 2.
Generating 3.
Generating 4.
Reading 4.
Generating 5.
Generating 6.
Reading 6.
Generating 7.
Generating 8.
Reading 8.
Generating 9.
Generating 10.
Reading 10.
==========
Generating 0.
Reading 0.
Generating 1.
Generating 2.
Reading 2.
Generating 3.
Generating 4.
Reading 4.
Generating 5.
Generating 6.
Reading 6.
Generating 7.
Generating 8.
Reading 8.
Generating 9.
Generating 10.
Reading 10.

生成コードは 22 回トリガーされます。

列挙型が初めて反復されるときに、11回トリガーされるようにしたいと思います。

その後、2 番目の反復は、既に生成された値の恩恵を受けます。

次のようになります。

IEnumerable<int> evenNumbers = Numbers.Where(i => i % 2 == 0).Buffer();

Rxに精通している人にとっては、ReplaySubject.

4

8 に答える 8

8

これには、F# パワー パックの型を使用できますMicrosoft.FSharp.Collections.LazyList<>(そうです、F# がインストールされていない C# から - 問題ありません!)。Nuget パッケージに入っていますFSPowerPack.Core.Community

特に、実装し、怠惰でキャッシュされてLazyListModule.ofSeq(...)いる a を返す whichを呼び出したいとします。LazyList<T>IEnumerable<T>

あなたの場合、使用法はただの問題です...

var evenNumbers = LazyListModule.ofSeq(Numbers.Where(i => i % 2 == 0));
var cachedEvenNumbers = LazyListModule.ofSeq(evenNumbers);

私は個人的varにはそのようなすべてのケースを好みますが、これはコンパイル時の型が単なるものよりも具体的であることを意味することに注意してくださいIEnumerable<>-これがマイナス面になる可能性があるというわけではありません. F# の非インターフェイス型のもう 1 つの利点は、LazyListModule.skip.

スレッドセーフかどうかはわかりませんLazyListが、そうであると思われます。


以下のコメントで指摘されている別の代替手段 (F# がインストールされている場合) はSeqModule.Cache(namespace Microsoft.FSharp.Collections、GACed アセンブリ FSharp.Core.dll にあります) であり、同じ効果的な動作をします。他の .NET 列挙型と同様に、Seq.cache効率的に連鎖できる末尾 (またはスキップ) 演算子がありません。

スレッドセーフ:この質問に対する他の解決策とは異なり、Seq.cacheは複数の列挙子を並行して実行できるという意味でスレッドセーフです (各列挙子はスレッドセーフではありません)。

パフォーマンス簡単なベンチマークを行ったところ、LazyListenumerable にはバリアントよりも少なくとも 4 倍のSeqModule.Cacheオーバーヘッドがあり、カスタム実装の回答よりも少なくとも 3 倍のオーバーヘッドがあります。そのため、F# バリアントは動作しますが、それほど高速ではありません。3〜12倍遅くても、(たとえば)I / Oまたは自明でない計算を行う列挙型と比較してそれほど遅くはないことに注意してください。したがって、これはほとんどの場合問題にならないでしょうが、維持するのは良いことですマインド。

TL;DR効率的でスレッドセーフなキャッシュされた列挙型が必要な場合は、単に使用してSeqModule.Cacheください。

于 2013-11-06T23:28:49.063 に答える
7

上記の Eamon の回答に基づいて、同時評価でも機能する別の機能ソリューション (新しいタイプはありません) を次に示します。これは、一般的なパターン (共有状態の反復) がこの問題の根底にあることを示しています。

まず、非常に一般的なヘルパー メソッドを定義します。これは、C# の無名イテレーターに欠けている機能をシミュレートできるようにするためのものです。

public static IEnumerable<T> Generate<T>(Func<Func<Tuple<T>>> generator)
{
    var tryGetNext = generator();
    while (true)
    {
        var result = tryGetNext();
        if (null == result)
        {
            yield break;
        }
        yield return result.Item1;
    }
}

生成は、状態を持つアグリゲーターのようなものです。yield return初期状態を返す関数と、C# で許可されている場合は無名のジェネレーター関数を受け入れます。によって返される状態はinitialize列挙ごとであることを意図していますが、よりグローバルな状態 (すべての列挙間で共有) は、以下に示すように、たとえばクロージャ変数で生成する呼び出し元によって維持できます。

これを「バッファリングされた Enumerable」問題に使用できます。

public static IEnumerable<T> Cached<T>(IEnumerable<T> enumerable)
{
    var cache = new List<T>();
    var enumerator = enumerable.GetEnumerator();

    return Generate<T>(() =>
    {
        int pos = -1;
        return () => {
            pos += 1;
            if (pos < cache.Count())
            {
                return new Tuple<T>(cache[pos]);
            }
            if (enumerator.MoveNext())
            {
                cache.Add(enumerator.Current);
                return new Tuple<T>(enumerator.Current);
            }
            return null;
        };
    });
}
于 2013-11-07T05:54:52.537 に答える
7

I hope this answer combines the brevity and clarity of sinelaw's answer and the support for multiple enumerations of Timothy's answer:

public static IEnumerable<T> Cached<T>(this IEnumerable<T> enumerable) {
    return CachedImpl(enumerable.GetEnumerator(), new List<T>());
}

static IEnumerable<T> CachedImpl<T>(IEnumerator<T> source, List<T> buffer) {
    int pos=0;
    while(true) {
        if(pos == buffer.Count) 
            if (source.MoveNext()) 
                buffer.Add(source.Current); 
            else 
                yield break;
        yield return buffer[pos++];
    }
}

Key ideas are to use the yield return syntax to make for a short enumerable implementation, but you still need a state-machine to decide whether you can get the next element from the buffer, or whether you need to check the underlying enumerator.

Limitations: This makes no attempt to be thread-safe, nor does it dispose the underlying enumerator (which, in general, is quite tricky to do as the underlying uncached enumerator must remain undisposed as long as any cached enumerabl might still be used).

于 2013-11-07T00:22:08.990 に答える
3

これは、不完全ではあるがコンパクトな「関数型」実装です (新しい型は定義されていません)。

バグは、同時列挙ができないことです。


元の説明: 最初の関数は 2 番目の関数内の無名ラムダである必要がありましたが、 C# では無名ラムダを使用できませんyield

// put these in some extensions class

private static IEnumerable<T> EnumerateAndCache<T>(IEnumerator<T> enumerator, List<T> cache)
{
    while (enumerator.MoveNext())
    {
        var current = enumerator.Current;
        cache.Add(current);
        yield return current;
    }
}
public static IEnumerable<T> ToCachedEnumerable<T>(this IEnumerable<T> enumerable)
{
    var enumerator = enumerable.GetEnumerator();
    var cache = new List<T>();
    return cache.Concat(EnumerateAndCache(enumerator, cache));
}

使用法:

var enumerable = Numbers.ToCachedEnumerable();
于 2013-11-06T23:56:27.593 に答える
3

Eamon Nerbonnesinelawの回答に完全に感謝します。まず、列挙子が完了したら解放します。2 つ目は、基礎となる列挙子をロックで保護して、列挙子を複数のスレッドで安全に使用できるようにすることです。

// This is just the same as @sinelaw's Generator but I didn't like the name
public static IEnumerable<T> AnonymousIterator<T>(Func<Func<Tuple<T>>> generator)
{
    var tryGetNext = generator();
    while (true)
    {
        var result = tryGetNext();
        if (null == result)
        {
            yield break;
        }
        yield return result.Item1;
    }
}

// Cached/Buffered/Replay behaviour
public static IEnumerable<T> Buffer<T>(this IEnumerable<T> self)
{
    // Rows are stored here when they've been fetched once
    var cache = new List<T>();

    // This counter is thread-safe in that it is incremented after the item has been added to the list,
    // hence it will never give a false positive. It may give a false negative, but that falls through
    // to the code which takes the lock so it's ok.
    var count = 0;

    // The enumerator is retained until it completes, then it is discarded.
    var enumerator = self.GetEnumerator();

    // This lock protects the enumerator only. The enumerable could be used on multiple threads
    // and the enumerator would then be shared among them, but enumerators are inherently not
    // thread-safe so a) we must protect that with a lock and b) we don't need to try and be
    // thread-safe in our own enumerator
    var lockObject = new object();

    return AnonymousIterator<T>(() =>
    {
        int pos = -1;
        return () =>
        {
            pos += 1;
            if (pos < count)
            {
                return new Tuple<T>(cache[pos]);
            }
            // Only take the lock when we need to
            lock (lockObject)
            {
                // The counter could have been updated between the check above and this one,
                // so now we have the lock we must check again
                if (pos < count)
                {
                    return new Tuple<T>(cache[pos]);
                }

                // Enumerator is set to null when it has completed
                if (enumerator != null)
                {
                    if (enumerator.MoveNext())
                    {
                        cache.Add(enumerator.Current);
                        count += 1;
                        return new Tuple<T>(enumerator.Current);
                    }
                    else
                    {
                        enumerator = null;
                    }
                }
            }
        }
        return null;
    };
});

}

于 2015-01-02T14:47:38.860 に答える
0

次の拡張メソッドを使用します。

このようにして、入力は最大速度で読み取られ、消費者は最大速度で処理されます。

public static IEnumerable<T> Buffer<T>(this IEnumerable<T> input)
{
    var blockingCollection = new BlockingCollection<T>();

    //read from the input
    Task.Factory.StartNew(() =>
    {
        foreach (var item in input)
        {
            blockingCollection.Add(item);
        }

        blockingCollection.CompleteAdding();
    });

    foreach (var item in blockingCollection.GetConsumingEnumerable())
    {
        yield return item;
    }
}

使用例

この例には、高速のプロデューサー (ファイルの検索) と低速のコンシューマー (ファイルのアップロード) があります。

long uploaded = 0;
long total = 0;

Directory
    .EnumerateFiles(inputFolder, "*.jpg", SearchOption.AllDirectories)
    .Select(filename =>
    {
        total++;
        return filename;
    })
    .Buffer()
    .ForEach(filename =>
    {
        //pretend to do something slow, like upload the file.
        Thread.Sleep(1000);
        uploaded++;

        Console.WriteLine($"Uploaded {uploaded:N0}/{total:N0}");
    });

ここに画像の説明を入力

于 2021-12-17T08:17:09.917 に答える