0

私は最初の並列アプリケーションを書き始めています。IDataReaderこのパーティショナーは、データソースから一度にプルchunkSizeするレコードを列挙します。

TLDR; バージョン

private object _Lock = new object();
public IEnumerator GetEnumerator()
{
    var infoSource = myInforSource.GetEnumerator();
                   //Will this cause a deadlock if two threads 
    lock (_Lock)   //use the enumator at the same time?
    {
        while (infoSource.MoveNext())
        {
            yield return infoSource.Current;
        }
    }
}

完全なコード

protected class DataSourcePartitioner<object[]> : System.Collections.Concurrent.Partitioner<object[]>
{
    private readonly System.Data.IDataReader _Input;
    private readonly int _ChunkSize;
    public DataSourcePartitioner(System.Data.IDataReader input, int chunkSize = 10000)
        : base()
    {
        if (chunkSize < 1)
            throw new ArgumentOutOfRangeException("chunkSize");
        _Input = input;
        _ChunkSize = chunkSize;
    }

    public override bool SupportsDynamicPartitions { get { return true; } }

    public override IList<IEnumerator<object[]>> GetPartitions(int partitionCount)
    {

        var dynamicPartitions = GetDynamicPartitions();
        var partitions =
            new IEnumerator<object[]>[partitionCount];

        for (int i = 0; i < partitionCount; i++)
        {
            partitions[i] = dynamicPartitions.GetEnumerator();
        }
        return partitions;


    }

    public override IEnumerable<object[]> GetDynamicPartitions()
    {
        return new ListDynamicPartitions(_Input, _ChunkSize);
    }
    private class ListDynamicPartitions : IEnumerable<object[]>
    {
        private System.Data.IDataReader _Input;
        int _ChunkSize;
        private object _ChunkLock = new object();
        public ListDynamicPartitions(System.Data.IDataReader input, int chunkSize)
        {
            _Input = input;
            _ChunkSize = chunkSize;
        }

        public IEnumerator<object[]> GetEnumerator()
        {

            while (true)
            {
                List<object[]> chunk = new List<object[]>(_ChunkSize);
                lock(_Input)
                {
                    for (int i = 0; i < _ChunkSize; ++i)
                    {
                        if (!_Input.Read())
                            break;
                        var values = new object[_Input.FieldCount];
                        _Input.GetValues(values);
                        chunk.Add(values);
                    }
                    if (chunk.Count == 0)
                        yield break;
                }
                var chunkEnumerator = chunk.GetEnumerator();
                lock(_ChunkLock) //Will this cause a deadlock?
                {
                    while (chunkEnumerator.MoveNext())
                    {
                        yield return chunkEnumerator.Current;
                    }
                }
            }
        }

        IEnumerator IEnumerable.GetEnumerator()
        {
            return ((IEnumerable<object[]>)this).GetEnumerator();
        }
    }
}

IEnumerable渡されたオブジェクトをスレッドセーフにしたかったのですが( MSDNの例では、PLINQとTPLがそれを必要とする可能性があると想定しています)_ChunkLock、下部近くのロックはスレッドセーフを提供するのに役立ちますか、それともデッドロックを引き起こしますか?ドキュメントから、ロックが解放されるかどうかはわかりませんでしたyeld return

また、私がやろうとしていることを実行する.netの機能が組み込まれている場合は、それを使用したいと思います。また、コードに他の問題が見つかった場合は、よろしくお願いします。

4

2 に答える 2

1

テストフレームワークを作成しました。デッドロックは発生しませんが、2番目のスレッドがデータを取得することはありません。

static void Main()
{
    En en = new En();
    Task.Factory.StartNew(() =>
        {
            foreach (int i in en)
            {
                Thread.Sleep(100);
                Console.WriteLine("A:" + i.ToString());
            }
        });
    Task.Factory.StartNew(() =>
    {
        foreach (int i in en)
        {
            Thread.Sleep(10);
            Console.WriteLine("B:" +i.ToString());
        }
    });
    Console.ReadLine();
}

public class En : IEnumerable
{
    object _lock = new object();
    static int i = 0;
    public IEnumerator GetEnumerator()
    {
        lock (_lock)
        {
            while (true)
            {
                if (i < 10)
                    yield return i++;
                else
                    yield break;
            }
        }
    }
}

戻り値

A:0
A:1
A:2
A:3
A:4
A:5
A:6
A:7
A:8
A:9

GetEnumeratorこれは、正しく動作するはずの更新バージョンです。

public IEnumerator<object[]> GetEnumerator()
{

    while (true)
    {
        List<object[]> chunk = new List<object[]>(_ChunkSize);
        _ChunkPos = 0;
        lock(_Input)
        {
            for (int i = 0; i < _ChunkSize; ++i)
            {
                if (!_Input.Read())
                    break;
                var values = new object[_Input.FieldCount];
                _Input.GetValues(values);
                chunk.Add(values);
            }
            if (chunk.Count == 0)
                yield break;
        }
        var chunkEnumerator = chunk.GetEnumerator();
        while (true)
        {
            object[] retVal;
            lock (_ChunkLock)
            {
                if (chunkEnumerator.MoveNext())
                {
                    retVal = chunkEnumerator.Current;
                }
                else 
                    break; //break out of chunk while loop.
            }
            yield return retVal;
        }
    }
}
于 2010-05-28T20:48:55.807 に答える
1

一言で言えば:多分*

foreachループのコンテキストで常にこのコードを使用している場合は、デッドロックが発生する可能性は低くなります(ただし、myInfoSource無限である可能性がある場合、またはforeachループにコードが含まれていて終了しない可能性がある場合を除く)。速度が低下する可能性があります。

潜在的な(実際には保証されている)デッドロックのより可能性の高い原因は次のとおりです。

var myObject = new YourObject();
var enumerator = myObject.GetEnumerator();

// if you do this, and then forget about it...
enumerator.MoveNext();

// ...your lock will never be released

*この回答は、最初のコードブロックに基づいています。

于 2010-05-28T20:58:29.700 に答える