1

Reading and Writing in 2 parallel tasks as shown below:

Task[] tasks = new Task[2];
var entityCollection = new BlockingCollection<Dictionary<String, object>>();

tasks[0] = Task.Factory.StartNew(() => ReadData(entityCollection), TaskCreationOptions.LongRunning);
tasks[1] = Task.Factory.StartNew(() => WriteJsontoFile(JSONFileName, entityCollection), TaskCreationOptions.LongRunning);

Task.WaitAll(tasks);

Read Task:

private void ReadData(BlockingCollection<Dictionary<String, object>> collection)
{
    do
    {
        //continuously data is being read in to entities, this part is working fine and then adding it to collection of BlockingCollection type to be consumed in Write task
        entitites.ToList().ForEach(e => collection.Add(e));
    } while (true);
    collection.CompleteAdding();
}

Write Task:

private void WriteJsontoFile(String JsonFileName, BlockingCollection<Dictionary<String, object>> source)
{
    using (StreamWriter sw = new StreamWriter(JsonFileName, true))
    {
        Parallel.ForEach(source.GetConsumingPartitioner(), (line) => ser.Serialize(sw, line));
    }
}

GetConsumingPartitioner() related code:

public static class BlockingCollection
{
    public static Partitioner<T> GetConsumingPartitioner<T>(
    this BlockingCollection<T> collection)
    {
        return new BlockingCollectionPartitioner<T>(collection);
    }
}

class BlockingCollectionPartitioner<T> : Partitioner<T>
{
    private BlockingCollection<T> _collection;

    internal BlockingCollectionPartitioner(BlockingCollection<T> collection)
    {
        if (collection == null)
            throw new ArgumentNullException("collection");
        _collection = collection;
    }

    public override bool SupportsDynamicPartitions
    {
        get { return true; }
    }

    public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
    {
        if (partitionCount < 1)
            throw new ArgumentOutOfRangeException("partitionCount");
        var dynamicPartitioner = GetDynamicPartitions();
        return Enumerable.Range(0, partitionCount).Select(_ =>
            dynamicPartitioner.GetEnumerator()).ToArray();
    }

    public override IEnumerable<T> GetDynamicPartitions()
    {
        return _collection.GetConsumingEnumerable();
    }
}

I am getting this below exception inside Write task:

Count cannot be less than zero.\r\nParameter name: count

4

1 に答える 1

1

これは、消費するための標準的な構文ではありません

BlockingCollection クラス

// Consume consume the BlockingCollection 
while (true) Console.WriteLine(bc.Take());
于 2013-10-27T13:08:55.377 に答える