複数のプロデューサーと 1 つのコンシューマーを使用してコンカレント プロデューサー/コンシューマー キューを実装しようとしています。プロデューサーは にデータを追加しQueue
、コンシューマーはこれらのデータをキューから取り出してコレクションを更新します。このコレクションは定期的に新しいファイルにバックアップする必要があります。この目的のために、カスタムのシリアライズ可能なコレクションを作成しました。シリアライズは、DataContractSerializer
.
- キューはコンシューマーとプロデューサーの間でのみ共有されるため、このキューへのアクセスは競合状態を回避するために管理する必要があります。
- カスタム コレクションは、コンシューマーとバックアップ スレッドの間で共有されます。
- バックアップ スレッドは、
System.Threading.Timer
オブジェクトを使用して定期的にアクティブ化できます。最初にコンシューマーによってスケジュールされ、その後、すべてのバックアップ手順の最後にスケジュールされます。 - 最後に、shutdown メソッドはプロデューサーによるキューイングを停止し、次にコンシューマーを停止し、最後のバックアップを実行してタイマーを破棄する必要があります。
一度にアイテムをデキューするのは効率的ではない可能性があるため、2 つのキューを使用することを考えました。最初のキューがいっぱいになると、プロデューサーは を呼び出してコンシューマーに通知しMonitor.Pulse
ます。コンシューマーが通知を受け取るとすぐにキューが交換されるため、プロデューサーが新しいアイテムをキューに入れている間、コンシューマーは前のアイテムを処理できます。
私が書いたサンプルは正常に動作するようです。スレッドセーフでもあると思いますが、それについてはわかりません。次のコードでは、簡単にするためにQueue<int>
. また、(簡単にするために)ArrayList
シリアル化可能なコレクションの代わりに も使用しました。
public class QueueManager
{
private readonly int m_QueueMaxSize;
private readonly TimeSpan m_BackupPeriod;
private readonly object m_SyncRoot_1 = new object();
private Queue<int> m_InputQueue = new Queue<int>();
private bool m_Shutdown;
private bool m_Pulsed;
private readonly object m_SyncRoot_2 = new object();
private ArrayList m_CustomCollection = new ArrayList();
private Thread m_ConsumerThread;
private Timer m_BackupThread;
private WaitHandle m_Disposed;
public QueueManager()
{
m_ConsumerThread = new Thread(Work) { IsBackground = true };
m_QueueMaxSize = 7;
m_BackupPeriod = TimeSpan.FromSeconds(30);
}
public void Run()
{
m_Shutdown = m_Pulsed = false;
m_BackupThread = new Timer(DoBackup);
m_Disposed = new AutoResetEvent(false);
m_ConsumerThread.Start();
}
public void Shutdown()
{
lock (m_SyncRoot_1)
{
m_Shutdown = true;
Console.WriteLine("Worker shutdown...");
Monitor.Pulse(m_SyncRoot_1);
}
m_ConsumerThread.Join();
WaitHandle.WaitAll(new WaitHandle[] { m_Disposed });
if (m_InputQueue != null) { m_InputQueue.Clear(); }
if (m_CustomCollection != null) { m_CustomCollection.Clear(); }
Console.WriteLine("Worker stopped!");
}
public void Enqueue(int item)
{
lock (m_SyncRoot_1)
{
if (m_InputQueue.Count == m_QueueMaxSize)
{
if (!m_Pulsed)
{
Monitor.Pulse(m_SyncRoot_1); // it notifies the consumer...
m_Pulsed = true;
}
Monitor.Wait(m_SyncRoot_1); // ... and waits for Pulse
}
m_InputQueue.Enqueue(item);
Console.WriteLine("{0} \t {1} >", Thread.CurrentThread.Name, item.ToString("+000;-000;"));
}
}
private void Work()
{
m_BackupThread.Change(m_BackupPeriod, TimeSpan.FromMilliseconds(-1));
Queue<int> m_SwapQueueRef, m_WorkerQueue = new Queue<int>();
Console.WriteLine("Worker started!");
while (true)
{
lock (m_SyncRoot_1)
{
if (m_InputQueue.Count < m_QueueMaxSize && !m_Shutdown) Monitor.Wait(m_SyncRoot_1);
Console.WriteLine("\nswapping...");
m_SwapQueueRef = m_InputQueue;
m_InputQueue = m_WorkerQueue;
m_WorkerQueue = m_SwapQueueRef;
m_Pulsed = false;
Monitor.PulseAll(m_SyncRoot_1); // all producers are notified
}
Console.WriteLine("Worker\t < {0}", String.Join(",", m_WorkerQueue.ToArray()));
lock (m_SyncRoot_2)
{
Console.WriteLine("Updating custom dictionary...");
foreach (int item in m_WorkerQueue)
{
m_CustomCollection.Add(item);
}
Thread.Sleep(1000);
Console.WriteLine("Custom dictionary updated successfully!");
}
if (m_Shutdown)
{
// schedule last backup
m_BackupThread.Change(0, Timeout.Infinite);
return;
}
m_WorkerQueue.Clear();
}
}
private void DoBackup(object state)
{
try
{
lock (m_SyncRoot_2)
{
Console.WriteLine("Backup...");
Thread.Sleep(2000);
Console.WriteLine("Backup completed at {0}", DateTime.Now);
}
}
finally
{
if (m_Shutdown) { m_BackupThread.Dispose(m_Disposed); }
else { m_BackupThread.Change(m_BackupPeriod, TimeSpan.FromMilliseconds(-1)); }
}
}
}
以下のコードに示すように、一部のオブジェクトはメソッドで初期化され、停止後Run
に再開できるようになっています。QueueManager
public static void Main(string[] args)
{
QueueManager queue = new QueueManager();
var t1 = new Thread(() =>
{
for (int i = 0; i < 50; i++)
{
queue.Enqueue(i);
Thread.Sleep(1500);
}
}) { Name = "t1" };
var t2 = new Thread(() =>
{
for (int i = 0; i > -30; i--)
{
queue.Enqueue(i);
Thread.Sleep(3000);
}
}) { Name = "t2" };
t1.Start(); t2.Start(); queue.Run();
t1.Join(); t2.Join(); queue.Shutdown();
Console.ReadLine();
var t3 = new Thread(() =>
{
for (int i = 0; i < 50; i++)
{
queue.Enqueue(i);
Thread.Sleep(1000);
}
}) { Name = "t3" };
var t4 = new Thread(() =>
{
for (int i = 0; i > -30; i--)
{
queue.Enqueue(i);
Thread.Sleep(2000);
}
}) { Name = "t4" };
t3.Start(); t4.Start(); queue.Run();
t3.Join(); t4.Join(); queue.Shutdown();
Console.ReadLine();
}