1

IProducerConsumerCollection(T)別のスレッドが消費するアイテムを定期的に追加するインターフェイスから を公開しています。

public interface IProducer<T>
{
    IProducerConsumerCollection<T> ProducerCollection { get; }
}

BlockingCollection(T)既存のコレクションでを使用しようとしましたが、 へのIProducerConsumerCollection(T)直接の追加はサポートされていないようです:

var queue = new ConcurrentQueue<string>();
var blockingCollection = new BlockingCollection<string>(queue);

var task1 = Task.Run(() => {
    Console.WriteLine("Dequeued " + blockingCollection.Take());
    Console.WriteLine("Dequeued " + blockingCollection.Take());
});

var task2 = Task.Run(() => {
    Console.WriteLine("Enqueueing Hello");
    queue.Enqueue("Hello");

    Console.WriteLine("Enqueueing World");
    queue.Enqueue("World");
});

Task.WaitAll(task1, task2);

BlockingCollection(T)は新しいアイテムに気付かないため、これは無期限にハングします。

メソッドに類似した機能はありますか、BlockingCollection(T).Takeそれともより単純なものはありますか:

static async Task<T> TakeAsync<T>(
    IProducerConsumerCollection<T> collection,
    CancellationToken token
)
{
    T result;

    while(!collection.TryTake(out result))
    {
        await Task.Yield();
        token.ThrowIfCancellationRequested();
    }

    return result;
}
4

1 に答える 1