大きな編集
ConcurrentQueue は、Enqueue(T) および T Dequeue() 操作に関してのみ安全です。その上でforeachを実行していますが、必要なレベルで同期されません。特定のケースでの最大の問題は、キュー (それ自体がコレクションである) を列挙すると、よく知られている「コレクションが変更されました」という例外がスローされる可能性があるという事実です。なぜそれが最大の問題なのですか?後でキューに物を追加しているため対応するオブジェクトをリストに追加しました(リストを同期する必要性も大いにありますが、最大の問題はたった1つの「弾丸」で解決されます)。コレクションを列挙している間、別のスレッドがそれを変更しているという事実を飲み込むのは簡単ではありません (顕微鏡レベルで変更が安全であっても - ConcurrentQueue はまさにそれを行います)。
したがって、別の同期手段を使用して、キュー (およびそこにいる間は中央のリスト) へのアクセスを同期する必要があります (つまり、ConcurrentQueue を忘れて、単純なキューまたはリストを使用することもできます。決してデキューしないでください)。
だから、次のようなことをしてください:
public void Writer(object toWrite) {
this.rwLock.EnterWriteLock();
try {
int tailIndex = this.list.Count;
this.list.Add(toWrite);
if (..condition1..)
this.queue1.Enqueue(tailIndex);
if (..condition2..)
this.queue2.Enqueue(tailIndex);
if (..condition3..)
this.queue3.Enqueue(tailIndex);
..etc..
} finally {
this.rwLock.ExitWriteLock();
}
}
そしてAccessItemsで:
public IEnumerable<object> AccessItems(int queueIndex) {
Queue<object> whichQueue = null;
switch (queueIndex) {
case 1: whichQueue = this.queue1; break;
case 2: whichQueue = this.queue2; break;
case 3: whichQueue = this.queue3; break;
..etc..
default: throw new NotSupportedException("Invalid queue disambiguating params");
}
List<object> results = new List<object>();
this.rwLock.EnterReadLock();
try {
foreach (var index in whichQueue)
results.Add(this.list[index]);
} finally {
this.rwLock.ExitReadLock();
}
return results;
}
そして、あなたのアプリがリストやさまざまなキューにアクセスするケースについての私の完全な理解に基づいて、それは 100% 安全であるはずです.
大編集終了
まず第一に、あなたが Thread-Safe と呼んでいるものは何ですか? エリック・リッパート著
あなたの特定のケースでは、答えはノーだと思います。
グローバル コンテキスト (実際のリスト) で矛盾が発生する可能性はありません。
代わりに、実際のリーダー (一意のライターと「衝突」する可能性が非常に高い) が、それ自体に矛盾が生じる可能性があります (独自のスタックの意味: すべてのメソッドのローカル変数、パラメーター、およびヒープの論理的に分離された部分)。 )))。
このような「スレッドごと」の不一致の可能性 (N 番目のスレッドはリスト内の要素の数を学習したいため、値が 39404999 であることがわかりますが、実際には 3 つの値しか追加されていません) は、一般的にそのアーキテクチャについて言えば、それを宣言するのに十分です。スレッドセーフではありません(ただし、グローバルにアクセス可能なリストを実際に変更することはありませんが、単に欠陥のある方法で読み取るだけです)。
ReaderWriterLockSlimクラスを使用することをお勧めします。あなたのニーズに合っていることがわかると思います:
private ReaderWriterLockSlim rwLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
private List<Object> items;
private ConcurrentQueue<int> queue;
private Timer timer;
private void callback(object state)
{
int index = items.Count;
this.rwLock.EnterWriteLock();
try {
// in this place, right here, there can be only ONE writer
// and while the writer is between EnterWriteLock and ExitWriteLock
// there can exist no readers in the following method (between EnterReadLock
// and ExitReadLock)
// we add the item to the List
// AND do the enqueue "atomically" (as loose a term as thread-safe)
items.Add(new object());
if (true)//some condition here
queue.Enqueue(index);
} finally {
this.rwLock.ExitWriteLock();
}
timer.Change(TimeSpan.FromMilliseconds(500), TimeSpan.FromMilliseconds(-1));
}
//This can be called from any thread
public IEnumerable<object> AccessItems()
{
List<object> results = new List<object>();
this.rwLock.EnterReadLock();
try {
// in this place there can exist a thousand readers
// (doing these actions right here, between EnterReadLock and ExitReadLock)
// all at the same time, but NO writers
foreach (var index in queue)
{
this.results.Add ( this.items[index] );
}
} finally {
this.rwLock.ExitReadLock();
}
return results; // or foreach yield return you like that more :)
}