私は4つのスレッドを持っています。1つは、ネットワークからいくつかの情報を読み取り、それを変数に書き込み、各部分の後に信号を送る必要があります。それらのうちの3つはこの変数を読み取っていて、正確に1回読み取る必要があります。現在の解決策は、ライターがイベントを書き込んだ後にイベントを設定し、リーダーのイベントを待つことです。読者はイベントを待ってから、イベントを読んで設定します(つまり、読んだことを意味します)。問題は、読者が複数回読むことができ、私がそれらに重複していることです。読者が1回だけ読むというルールをどのように達成できますか?
5 に答える
これを実装する1つの方法は次のとおりです
データは、単一リンクリストとしてスレッド間で共有されます。リスト内のすべてのノードは、マーカーにすることも、データを持つこともできます。リストは、マーカーに入力される単一のノードとして始まります。データが読み取られると、一連のデータノードとそれに続くマーカーを持つ新しいリストが形成されます。このリストは、リストに追加された最新のマーカーに追加されます。
すべてのリーダースレッドは、元のマーカーノードと。への参照で始まりますAutoResetEvent
。新しいデータがライターに入るとAutoResetEvent
、すべてのリーダースレッドに信号が送られます。リーダースレッドは、Nextノードのないマーカーが見つかるまで単純に歩きます。
このスキームにより、すべてのリーダーがデータを1回だけ表示できるようになります。最大の問題は、リストをロックなしで読み書きできるようにリストを作成することです。これはかなり簡単ですInterlocked.CompareExchange
が
リンクリストタイプ
class Node<T> {
public bool IsMarker;
public T Data;
public Node<T> Next;
}
サンプルライタータイプ
class Writer<T> {
private List<AutoResetEvent> m_list;
private Node<T> m_lastMarker;
public Writer(List<AutoResetEvent> list, Node<T> marker) {
m_lastMarker = marker;
m_list = list;
}
// Assuming this can't overlap. If this can overload then you will
// need synchronization in this method around the writing of
// m_lastMarker
void OnDataRead(T[] items) {
if (items.Length == 0) {
return;
}
// Build up a linked list of the new data followed by a
// Marker to signify the end of the data.
var head = new Node<T>() { Data = items[0] };
var current = head;
for (int i = 1; i < items.Length; i++) {
current.Next = new Node<T>{ Data = items[i] };
current = current.Next;
}
var marker = new Node<T> { IsMarker = true };
current.Next = marker;
// Append the list to the end of the last marker node the writer
// created
m_lastMarker.Next = head;
m_lastMarker = marker;
// Tell each of the readers that there is new data
foreach (var e in m_list) {
e.Set();
}
}
}
サンプルリーダータイプ
class Reader<T> {
private AutoResetEvent m_event;
private Node<T> m_marker;
void Go() {
while(true) {
m_event.WaitOne();
var current = m_marker.Next;
while (current != null) {
if (current.IsMarker) {
// Found a new marker. Always record the marker because it may
// be the last marker in the chain
m_marker = current;
} else {
// Actually process the data
ProcessData(current.Data);
}
current = current.Next;
}
}
}
}
同じ値を複数回取得する可能性を受け入れるようにコンシューマースレッドをコーディングする必要があるというコメントに同意します。おそらく、それを行う最も簡単な方法は、各更新に順次識別子を追加することです。このようにして、スレッドはシーケンシャルIDを最後に読み取ったIDと比較し、重複しているかどうかを知ることができます。
また、値を逃したかどうかもわかります。
ただし、本当にロックステップで値を取得する必要がある場合は、2つのオブジェクトと1つのManualResetEvent
オブジェクトを使用することをお勧めしますCountdownEvent
。使い方は次のとおりです。
ManualResetEvent DataReadyEvent = new ManualResetEvent();
ManualResetEvent WaitForResultEvent = new ManualResetEvent();
CountdownEvent Acknowledgement = new CountdownEvent(NumWaitingThreads);
リーダースレッドはを待機しDataReadyEvent
ます。
他のスレッドがネットワークから値を読み取ると、次のようになります。
Acknowledgement.Reset(NumWaitingThreads);
DataReadyEvent.Set(); // signal waiting threads to process
Acknowledgement.WaitOne(); // wait for all threads to signal they got it.
DataReadyEvent.Reset(); // block threads' reading
WaitForResultEvent.Set(); // tell threads they can continue
待機中のスレッドはこれを行います:
DataReadyEvent.WaitOne(); // wait for value to be available
// read the value
Acknowledgement.Set(); // acknowledge receipt
WaitForResultEvent.WaitOne(); // wait for signal to proceed
これは、待機中のスレッドごとに2つのイベントを発生させるのと同じ効果がありますが、はるかに単純です。
ただし、スレッドがクラッシュした場合、カウントダウンイベントでハングするという欠点があります。ただし、プロデューサースレッドがすべてのスレッドメッセージを待機している場合は、メソッドも同様になります。
これは、バリアクラスに最適です。
2つを使用Barriers
して、2つの状態間でフリップフロップを実行できます。
次に例を示します。
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
internal class Program
{
private static void Main(string[] args)
{
int readerCount = 4;
Barrier barrier1 = new Barrier(readerCount + 1);
Barrier barrier2 = new Barrier(readerCount + 1);
for (int i = 0; i < readerCount; ++i)
{
Task.Factory.StartNew(() => reader(barrier1, barrier2));
}
while (true)
{
barrier1.SignalAndWait(); // Wait for all threads to reach the "new data available" point.
if ((value % 10000) == 0) // Print message every so often.
Console.WriteLine(value);
barrier2.SignalAndWait(); // Wait for the reader threads to read the current value.
++value; // Produce the next value.
}
}
private static void reader(Barrier barrier1, Barrier barrier2)
{
int expected = 0;
while (true)
{
barrier1.SignalAndWait(); // Wait for "new data available".
if (value != expected)
{
Console.WriteLine("Expected " + expected + ", got " + value);
}
++expected;
barrier2.SignalAndWait(); // Signal that we've read the data, and wait for all other threads.
}
}
private static volatile int value;
}
}
ConcurrentQueueをお勧めします-各スレッドがキューから一意のインスタンスを取得することを保証します。これがその使い方の良い説明です。
ConnurrentQueue<T>.TryDequeue()
キューが空でないかどうか、およびキューからアイテムを取得していないかどうかをチェックするスレッドセーフなメソッドです。両方の操作を同時に実行するため、プログラマーは競合状態について心配する必要はありません。
私は行く方法を見つけたと思います。AutoResetEventの2つの配列を作成し、すべてのリーダーに2つのイベントがあり、書き込みイベントを待機して読み取りイベントを設定し、ライターがすべての書き込みイベントを設定してすべての読み取りイベントを待機します。
JaredPar、あなたの答えは役に立ち、私を助けてくれました