この記事の最後では、 GetOrAdd を使用すると (正しく理解している場合) 破損または予期しない結果が生じる可能性について説明しています。
をちょきちょきと切る/
ConcurrentDictionary は、マルチスレッド シナリオ向けに設計されています。コレクションの項目を追加または削除するために、コードでロックを使用する必要はありません。ただし、あるスレッドが値を取得し、別のスレッドが同じキーに新しい値を与えることでコレクションを即座に更新することは常に可能です。
また、ConcurrentDictionary のすべてのメソッドはスレッド セーフですが、すべてのメソッド、特に GetOrAdd と AddOrUpdate がアトミックというわけではありません。これらのメソッドに渡されるユーザー デリゲートは、ディクショナリの内部ロックの外で呼び出されます。(これは、未知のコードがすべてのスレッドをブロックするのを防ぐために行われます。) したがって、次の一連のイベントが発生する可能性があります。
1) threadA は GetOrAdd を呼び出しますが、項目が見つからず、valueFactory デリゲートを呼び出して Add に新しい項目を作成します。
2) スレッド B は同時に GetOrAdd を呼び出し、その valueFactory デリゲートが呼び出され、スレッド A の前に内部ロックに到達するため、その新しいキーと値のペアがディクショナリに追加されます。
3) threadA のユーザー デリゲートが完了し、スレッドがロックに到達しますが、アイテムが既に存在することがわかります。
4) threadA は「Get」を実行し、以前に threadB によって追加されたデータを返します。
したがって、GetOrAdd によって返されるデータが、スレッドの valueFactory によって作成されたデータと同じであるという保証はありません。AddOrUpdate が呼び出されると、同様の一連のイベントが発生する可能性があります。
質問
データを確認して更新を再試行する正しい方法は何ですか? 良いアプローチは、古い値の内容に基づいてこの操作を試行/再試行する拡張メソッドです。
これはどのように実装されますか?結果 ( ) を有効な終了状態として信頼できますverify
か? それとも、別の方法を使用して値を再試行して再取得する必要がありますか?
コード
次のコードでは、値を更新するときに競合状態が発生します。++
望ましい動作は、 AddOrUpdateWithoutRetriving() がさまざまな方法で (またはを使用して) さまざまな値をインクリメントすることInterlocked.Increment()
です。
また、単一のユニットで複数のフィールド操作を実行し、競合状態のために前回の更新が「実行」されなかった場合に更新を再試行したいと考えています。
コードを実行すると、コンソールに表示される各値が 1 ずつ増加していくのがわかりますが、各値はドリフトし、いくつかの値は数回前後します。
namespace DictionaryHowTo
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
// The type of the Value to store in the dictionary:
class FilterConcurrentDuplicate
{
// Create a new concurrent dictionary.
readonly ConcurrentDictionary<int, TestData> eventLogCache =
new ConcurrentDictionary<int, TestData>();
static void Main()
{
FilterConcurrentDuplicate c = new FilterConcurrentDuplicate();
c.DoRace(null);
}
readonly ConcurrentDictionary<int, TestData> concurrentCache =
new ConcurrentDictionary<int, TestData>();
void DoRace(string[] args)
{
int max = 1000;
// Add some key/value pairs from multiple threads.
Task[] tasks = new Task[3];
tasks[0] = Task.Factory.StartNew(() =>
{
System.Random RandNum = new System.Random();
int MyRandomNumber = RandNum.Next(1, 500);
Thread.Sleep(MyRandomNumber);
AddOrUpdateWithoutRetrieving();
});
tasks[1] = Task.Factory.StartNew(() =>
{
System.Random RandNum = new System.Random();
int MyRandomNumber = RandNum.Next(1, 1000);
Thread.Sleep(MyRandomNumber);
AddOrUpdateWithoutRetrieving();
});
tasks[2] = Task.Factory.StartNew(() =>
{
AddOrUpdateWithoutRetrieving();
});
// Output results so far.
Task.WaitAll(tasks);
AddOrUpdateWithoutRetrieving();
Console.WriteLine("Press any key.");
Console.ReadKey();
}
public class TestData : IEqualityComparer<TestData>
{
public string aStr1 { get; set; }
public Guid? aGud1 { get; set; }
public string aStr2 { get; set; }
public int aInt1 { get; set; }
public long? aLong1 { get; set; }
public DateTime aDate1 { get; set; }
public DateTime? aDate2 { get; set; }
//public int QueryCount { get; set; }
public int QueryCount = 0;//
public string zData { get; set; }
public bool Equals(TestData x, TestData y)
{
return x.aStr1 == y.aStr1 &&
x.aStr2 == y.aStr2 &&
x.aGud1 == y.aGud1 &&
x.aStr2 == y.aStr2 &&
x.aInt1 == y.aInt1 &&
x.aLong1 == y.aLong1 &&
x.aDate1 == y.aDate1 &&
x.QueryCount == y.QueryCount ;
}
public int GetHashCode(TestData obj)
{
TestData ci = (TestData)obj;
// http://stackoverflow.com/a/263416/328397
return
new {
A = ci.aStr1,
Aa = ci.aStr2,
B = ci.aGud1,
C = ci.aStr2,
D = ci.aInt1,
E = ci.aLong1,
F = ci.QueryCount ,
G = ci.aDate1}.GetHashCode();
}
}
private void AddOrUpdateWithoutRetrieving()
{
// Sometime later. We receive new data from some source.
TestData ci = new TestData()
{
aStr1 = "Austin",
aGud1 = new Guid(),
aStr2 = "System",
aLong1 = 100,
aInt1 = 1000,
QueryCount = 0,
aDate1 = DateTime.MinValue
};
TestData verify = concurrentCache.AddOrUpdate(123, ci,
(key, existingVal) =>
{
existingVal.aStr2 = "test1" + existingVal.QueryCount;
existingVal.aDate1 = DateTime.MinValue;
Console.WriteLine
("Thread:" + Thread.CurrentThread.ManagedThreadId +
" Query Count A:" + existingVal.QueryCount);
Interlocked.Increment(ref existingVal.QueryCount);
System.Random RandNum = new System.Random();
int MyRandomNumber = RandNum.Next(1, 1000);
Thread.Sleep(MyRandomNumber);
existingVal.aInt1++;
existingVal.aDate1 =
existingVal.aDate1.AddSeconds
(existingVal.aInt1);
Console.WriteLine(
"Thread:" + Thread.CurrentThread.ManagedThreadId +
" Query Count B:" + existingVal.QueryCount);
return existingVal;
});
// After each run, every value here should be ++ the previous value
Console.WriteLine(
"Thread:"+Thread.CurrentThread.ManagedThreadId +
": Query Count returned:" + verify.QueryCount +
" eid:" + verify.aInt1 + " date:" +
verify.aDate1.Hour + " " + verify.aDate1.Second +
" NAME:" + verify.aStr2
);
}
}
}
出力
Thread:12: Query Count returned:0 eid:1000 date:0 0 NAME:System
Thread:12 Query Count A:0
Thread:13 Query Count A:1
Thread:12 Query Count B:2
Thread:12: Query Count returned:2 eid:1001 date:0 41 NAME:test11
Thread:12 Query Count A:2
Thread:13 Query Count B:3
Thread:13: Query Count returned:3 eid:1002 date:0 42 NAME:test12
Thread:13 Query Count A:3
Thread:11 Query Count A:4
Thread:11 Query Count B:5
Thread:11: Query Count returned:5 eid:1003 date:0 43 NAME:test14
Thread:11 Query Count A:5
Thread:13 Query Count B:6
Thread:13: Query Count returned:6 eid:1004 date:0 44 NAME:test15
....
Thread:11 Query Count A:658
Thread:11 Query Count B:659
Thread:11: Query Count returned:659 eid:1656 date:0 36 NAME:test1658
Thread:11 Query Count A:659
Thread:11 Query Count B:660
Thread:11: Query Count returned:660 eid:1657 date:0 37 NAME:test1659
Thread:11 Query Count A:660
Thread:11 Query Count B:661
Thread:11: Query Count returned:661 eid:1658 date:0 38 NAME:test1660
Thread:11 Query Count A:661
Thread:11 Query Count B:662
Thread:11: Query Count returned:662 eid:1659 date:0 39 NAME:test1661
このコードでは、「eid」は常にクエリ カウントよりも 1,000 多い必要がありますが、反復中の差は 2 つの間で 1 から 7 まで変化します。この不一致により、一部のアプリケーションが失敗したり、誤ったデータを報告したりする可能性があります。