5

この記事の最後では、 GetOrAdd を使用すると (正しく理解している場合) 破損または予期しない結果が生じる可能性について説明しています。

をちょきちょきと切る/

ConcurrentDictionary は、マルチスレッド シナリオ向けに設計されています。コレクションの項目を追加または削除するために、コードでロックを使用する必要はありません。ただし、あるスレッドが値を取得し、別のスレッドが同じキーに新しい値を与えることでコレクションを即座に更新することは常に可能です。

また、ConcurrentDictionary のすべてのメソッドはスレッド セーフですが、すべてのメソッド、特に GetOrAdd と AddOrUpdate がアトミックというわけではありません。これらのメソッドに渡されるユーザー デリゲートは、ディクショナリの内部ロックの外で呼び出されます。(これは、未知のコードがすべてのスレッドをブロックするのを防ぐために行われます。) したがって、次の一連のイベントが発生する可能性があります。

1) threadA は GetOrAdd を呼び出しますが、項目が見つからず、valueFactory デリゲートを呼び出して Add に新しい項目を作成します。

2) スレッド B は同時に GetOrAdd を呼び出し、その valueFactory デリゲートが呼び出され、スレッド A の前に内部ロックに到達するため、その新しいキーと値のペアがディクショナリに追加されます。

3) threadA のユーザー デリゲートが完了し、スレッドがロックに到達しますが、アイテムが既に存在することがわかります。

4) threadA は「Get」を実行し、以前に threadB によって追加されたデータを返します。

したがって、GetOrAdd によって返されるデータが、スレッドの valueFactory によって作成されたデータと同じであるという保証はありません。AddOrUpdate が呼び出されると、同様の一連のイベントが発生する可能性があります。

質問

データを確認して更新を再試行する正しい方法は何ですか? 良いアプローチは、古い値の内容に基づいてこの操作を試行/再試行する拡張メソッドです。

これはどのように実装されますか?結果 ( ) を有効な終了状態として信頼できますverifyか? それとも、別の方法を使用して値を再試行して再取得する必要がありますか?

コード

次のコードでは、値を更新するときに競合状態が発生します。++望ましい動作は、 AddOrUpdateWithoutRetriving() がさまざまな方法で (またはを使用して) さまざまな値をインクリメントすることInterlocked.Increment()です。

また、単一のユニットで複数のフィールド操作を実行し、競合状態のために前回の更新が「実行」されなかった場合に更新を再試行したいと考えています。

コードを実行すると、コンソールに表示される各値が 1 ずつ増加していくのがわかりますが、各値はドリフトし、いくつかの値は数回前後します。

namespace DictionaryHowTo
{
    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;

    // The type of the Value to store in the dictionary:
    class FilterConcurrentDuplicate
    {
        // Create a new concurrent dictionary.
        readonly ConcurrentDictionary<int, TestData> eventLogCache = 
             new ConcurrentDictionary<int, TestData>();

        static void Main()
        {
            FilterConcurrentDuplicate c = new FilterConcurrentDuplicate();

            c.DoRace(null);
        }

        readonly ConcurrentDictionary<int, TestData> concurrentCache = 
            new ConcurrentDictionary<int, TestData>();
        void DoRace(string[] args)
        {
            int max = 1000;

            // Add some key/value pairs from multiple threads.
            Task[] tasks = new Task[3];

            tasks[0] = Task.Factory.StartNew(() =>
            {

                System.Random RandNum = new System.Random();
                int MyRandomNumber = RandNum.Next(1, 500);

                Thread.Sleep(MyRandomNumber);
                AddOrUpdateWithoutRetrieving();

            });

            tasks[1] = Task.Factory.StartNew(() =>
            {
                System.Random RandNum = new System.Random();
                int MyRandomNumber = RandNum.Next(1, 1000);

                Thread.Sleep(MyRandomNumber);

                AddOrUpdateWithoutRetrieving();

            });

            tasks[2] = Task.Factory.StartNew(() =>
            {
                AddOrUpdateWithoutRetrieving();

            });
            // Output results so far.
            Task.WaitAll(tasks);

            AddOrUpdateWithoutRetrieving();

            Console.WriteLine("Press any key.");
            Console.ReadKey();
        }
        public class TestData : IEqualityComparer<TestData>
        {
            public string aStr1 { get; set; }
            public Guid? aGud1 { get; set; }
            public string aStr2 { get; set; }
            public int aInt1 { get; set; }
            public long? aLong1 { get; set; }

            public DateTime aDate1 { get; set; }
            public DateTime? aDate2 { get; set; }

            //public int QueryCount { get; set; }
            public int QueryCount = 0;//

            public string zData { get; set; }
            public bool Equals(TestData x, TestData y)
            {
                return x.aStr1 == y.aStr1 &&
                    x.aStr2 == y.aStr2 &&
                       x.aGud1 == y.aGud1 &&
                       x.aStr2 == y.aStr2 &&
                       x.aInt1 == y.aInt1 &&
                       x.aLong1 == y.aLong1 &&
                       x.aDate1 == y.aDate1 &&
                       x.QueryCount == y.QueryCount ;
            }

            public int GetHashCode(TestData obj)
            {
                TestData ci = (TestData)obj;
                // http://stackoverflow.com/a/263416/328397
                return 
                  new { 
                         A = ci.aStr1, 
                         Aa = ci.aStr2, 
                         B = ci.aGud1, 
                         C = ci.aStr2, 
                         D = ci.aInt1, 
                         E = ci.aLong1, 
                         F = ci.QueryCount , 
                         G = ci.aDate1}.GetHashCode();
            }
        }
        private   void AddOrUpdateWithoutRetrieving()
        {
            // Sometime later. We receive new data from some source.
            TestData ci = new TestData() 
            { 
              aStr1 = "Austin", 
              aGud1 = new Guid(), 
              aStr2 = "System", 
              aLong1 = 100, 
              aInt1 = 1000, 
              QueryCount = 0, 
              aDate1 = DateTime.MinValue
            };

            TestData verify = concurrentCache.AddOrUpdate(123, ci,
                (key, existingVal) =>
                {
                    existingVal.aStr2 = "test1" + existingVal.QueryCount;
                    existingVal.aDate1 = DateTime.MinValue;
                    Console.WriteLine
                     ("Thread:" + Thread.CurrentThread.ManagedThreadId + 
                          "  Query Count A:" + existingVal.QueryCount);
                    Interlocked.Increment(ref existingVal.QueryCount);
                    System.Random RandNum = new System.Random();
                    int MyRandomNumber = RandNum.Next(1, 1000);

                    Thread.Sleep(MyRandomNumber);
                    existingVal.aInt1++;
                    existingVal.aDate1 = 
                         existingVal.aDate1.AddSeconds
                         (existingVal.aInt1);  
                    Console.WriteLine(
                          "Thread:" + Thread.CurrentThread.ManagedThreadId + 
                           "  Query Count B:" + existingVal.QueryCount);
                    return existingVal;
                });


            // After each run, every value here should be ++ the previous value
            Console.WriteLine(
                "Thread:"+Thread.CurrentThread.ManagedThreadId + 
                 ": Query Count returned:" + verify.QueryCount + 
                 " eid:" + verify.aInt1 + " date:" +  
                 verify.aDate1.Hour + " "  + verify.aDate1.Second + 
                 " NAME:" + verify.aStr2
                );
        }

    }
}

出力

Thread:12: Query Count returned:0 eid:1000 date:0 0 NAME:System

Thread:12  Query Count A:0
Thread:13  Query Count A:1
Thread:12  Query Count B:2
Thread:12: Query Count returned:2 eid:1001 date:0 41 NAME:test11

Thread:12  Query Count A:2
Thread:13  Query Count B:3
Thread:13: Query Count returned:3 eid:1002 date:0 42 NAME:test12

Thread:13  Query Count A:3
Thread:11  Query Count A:4
Thread:11  Query Count B:5
Thread:11: Query Count returned:5 eid:1003 date:0 43 NAME:test14

Thread:11  Query Count A:5
Thread:13  Query Count B:6
Thread:13: Query Count returned:6 eid:1004 date:0 44 NAME:test15

....

Thread:11  Query Count A:658
Thread:11  Query Count B:659
Thread:11: Query Count returned:659 eid:1656 date:0 36 NAME:test1658

Thread:11  Query Count A:659
Thread:11  Query Count B:660
Thread:11: Query Count returned:660 eid:1657 date:0 37 NAME:test1659

Thread:11  Query Count A:660
Thread:11  Query Count B:661
Thread:11: Query Count returned:661 eid:1658 date:0 38 NAME:test1660

Thread:11  Query Count A:661
Thread:11  Query Count B:662
Thread:11: Query Count returned:662 eid:1659 date:0 39 NAME:test1661

このコードでは、「eid」は常にクエリ カウントよりも 1,000 多い必要がありますが、反復中の差は 2 つの間で 1 から 7 まで変化します。この不一致により、一部のアプリケーションが失敗したり、誤ったデータを報告したりする可能性があります。

4

4 に答える 4

4

この提出は、記事「<em>How to: Add and Remove Items from a ConcurrentDictionary」( http://msdn.microsoft.com/en-us/library/dd997369 ) の下部にあるコメントの誤った理解に基づいています。 aspxと、基本的な同時実行バグ (共有オブジェクトの非アトミックな同時変更) について。

まず、リンクされた記事が実際に何を言っているのかを明確にしましょう。例として AddOrUpdate を使用しますが、GetOrAdd の理由は同等です。

たとえば、複数のスレッドから AddOrUpdate を呼び出し、同じキーを指定します。そのキーを持つエントリがすでに存在すると仮定します。各スレッドが表示されます。指定されたキーを持つエントリが既に存在し、AddOrUpdate の Update 部分が関連していることに注意してください。そうすることで、どのスレッドもディクショナリをロックしません。代わりに、いくつかのインターロックされた命令を使用して、エントリ キーが存在するかどうかをアトミックにチェックします。

そのため、いくつかのスレッドはすべて、キーが存在し、updateValueFactory を呼び出す必要があることに気付きました。そのデリゲートは AddOrUpdate に渡されます。既存のキーと値を参照し、更新値を返します。これで、関連するすべてのスレッドがファクトリを同時に呼び出します。それらはすべて、以前は未知の順序で完了し、すべてのスレッドはアトミック操作 (インターロックされた命令を使用) を使用して、既存の値を計算したばかりの値に置き換えようとします。どのスレッドが「勝つ」かを知る方法はありません。勝ったスレッドは、計算された値を保存します。他の人は、ディクショナリの値が、引数として updateValueFactory に渡された値ではなくなっていることに気付くでしょう。その認識に応じて、彼らは操作を放棄し、計算されたばかりの値を捨てます。

次に、ここにリストされているコード サンプルを実行すると、奇妙な値が得られる理由を明確にします。

AddOrUpdate に渡された updateValueFactory デリゲートは、既存のキーと値を REFERENCES して更新値を返すことを思い出してください。AddOrUpdateWithoutRetriving() メソッドのコード サンプルは、その参照に対して直接操作を実行し始めます。新しい置換値を作成して THAT を変更する代わりに、existingVal のインスタンス メンバー値 (既に辞書にあるオブジェクト) を変更し、単にその参照を返します。そして、それは原子的にではなく、いくつかの値を読み取り、いくつかの値を更新し、さらに読み取り、さらに更新します。もちろん、これは複数のスレッドで同時に発生することを上で見ました。それらはすべて同じオブジェクトを変更します。その結果、どの時点でも (コード サンプルが WriteLine を呼び出したとき)、オブジェクトには異なるスレッドから生成されたメンバー インスタンス値が含まれているのも不思議ではありません。

ディクショナリはこれとは関係ありません。コードは、スレッド間で非アトミックに共有されるオブジェクトを変更するだけです。これは、最も一般的な並行性のバグの 1 つです。最も一般的な 2 つの回避策は、シナリオによって異なります。共有ロックを使用してオブジェクト全体の変更をアトミックにするか、最初にオブジェクト全体をアトミックにコピーしてからローカル コピーを変更します。

後者については、これを TestData クラスに追加してみてください。

private Object _copyLock = null;

private Object GetLock() {

    if (_copyLock != null)
        return _copyLock;

    Object newLock = new Object();
    Object prevLock = Interlocked.CompareExchange(ref _copyLock, newLock, null);
    return (prevLock == null) ? newLock : prevLock;
}

public TestData Copy() {

    lock (GetLock()) {
        TestData copy = new TestData();
        copy.aStr1 = this.aStr1;
        copy.aStr2 = this.aStr2;
        copy.aLong1 = this.aLong1;
        copy.aInt1 = this.aInt1;
        copy.QueryCount = this.QueryCount;
        copy.aDate1 = this.aDate1;
        copy.aDate2 = this.aDate2;
        copy.zData = this.zData;

        return copy;
    }
}

次に、ファクトリを次のように変更します。

TestData verify = concurrentCache.AddOrUpdate(123, ci,
    (key, existingVal) =>
    {
        TestData newVal = existingVal.Copy();
        newVal.aStr2 = "test1" + newVal.QueryCount;
        newVal.aDate1 = DateTime.MinValue;
        Console.WriteLine("Thread:" + Thread.CurrentThread.ManagedThreadId + "  Query Count A:" + newVal.QueryCount);
        Interlocked.Increment(ref newVal.QueryCount);
        System.Random RandNum = new System.Random();
        int MyRandomNumber = RandNum.Next(1, 1000);

        Thread.Sleep(MyRandomNumber);
        newVal.aInt1++;
        newVal.aDate1 = newVal.aDate1.AddSeconds(newVal.aInt1);
        Console.WriteLine("Thread:" + Thread.CurrentThread.ManagedThreadId + "  Query Count B:" + newVal.QueryCount);
        return newVal;
    });

これが役立つことを願っています。

于 2012-08-16T20:00:01.173 に答える
3

おそらく正しい方法は、返された値が によって作成されたものではないかどうかを気にしないことですvalueFactory。これが受け入れられない場合は、ロックを使用する必要があります。

于 2012-05-19T18:15:53.657 に答える
2

常に機能する一般的な保護はありません。Lazy<T>ただし、一般的な回避策は、 の代わりにを返すことTです。そうすれば、不要な遅延を作成しても害はありません。開始されることはないからです。キーに対応する最終的な値になるのは、1 つの Lazy だけです。特定の Lazy インスタンスが 1 つだけ返されます。

于 2012-05-19T22:00:53.867 に答える
1

男性自身からのこの実装を使用できます。GetOrAddここでも、結果をディクショナリに追加せずにファクトリを呼び出すことができることに注意してください。しかし、あなたは何が起こったのかを示すでしょう。

于 2012-05-20T09:24:48.220 に答える