0

複数のプロデューサーと 1 つのコンシューマーを使用してコンカレント プロデューサー/コンシューマー キューを実装しようとしています。プロデューサーは にデータを追加しQueue、コンシューマーはこれらのデータをキューから取り出してコレクションを更新します。このコレクションは定期的に新しいファイルにバックアップする必要があります。この目的のために、カスタムのシリアライズ可能なコレクションを作成しました。シリアライズは、DataContractSerializer.

  • キューはコンシューマーとプロデューサーの間でのみ共有されるため、このキューへのアクセスは競合状態を回避するために管理する必要があります。
  • カスタム コレクションは、コンシューマーとバックアップ スレッドの間で共有されます。
  • バックアップ スレッドは、System.Threading.Timerオブジェクトを使用して定期的にアクティブ化できます。最初にコンシューマーによってスケジュールされ、その後、すべてのバックアップ手順の最後にスケジュールされます。
  • 最後に、shutdown メソッドはプロデューサーによるキューイングを停止し、次にコンシューマーを停止し、最後のバックアップを実行してタイマーを破棄する必要があります。

一度にアイテムをデキューするのは効率的ではない可能性があるため、2 つのキューを使用することを考えました。最初のキューがいっぱいになると、プロデューサーは を呼び出してコンシューマーに通知しMonitor.Pulseます。コンシューマーが通知を受け取るとすぐにキューが交換されるため、プロデューサーが新しいアイテムをキューに入れている間、コンシューマーは前のアイテムを処理できます。

私が書いたサンプルは正常に動作するようです。スレッドセーフでもあると思いますが、それについてはわかりません。次のコードでは、簡単にするためにQueue<int>. また、(簡単にするために)ArrayListシリアル化可能なコレクションの代わりに も使用しました。

public class QueueManager
{
    private readonly int m_QueueMaxSize;
    private readonly TimeSpan m_BackupPeriod;

    private readonly object m_SyncRoot_1 = new object();
    private Queue<int> m_InputQueue = new Queue<int>();
    private bool m_Shutdown;
    private bool m_Pulsed;

    private readonly object m_SyncRoot_2 = new object();
    private ArrayList m_CustomCollection = new ArrayList();

    private Thread m_ConsumerThread;
    private Timer m_BackupThread;
    private WaitHandle m_Disposed;

    public QueueManager()
    {
        m_ConsumerThread = new Thread(Work) { IsBackground = true };

        m_QueueMaxSize = 7;
        m_BackupPeriod = TimeSpan.FromSeconds(30);
    }

    public void Run()
    {
        m_Shutdown = m_Pulsed = false;

        m_BackupThread = new Timer(DoBackup);
        m_Disposed = new AutoResetEvent(false);

        m_ConsumerThread.Start();
    }

    public void Shutdown()
    {
        lock (m_SyncRoot_1)
        {
            m_Shutdown = true;
            Console.WriteLine("Worker shutdown...");

            Monitor.Pulse(m_SyncRoot_1);
        }

        m_ConsumerThread.Join();
        WaitHandle.WaitAll(new WaitHandle[] { m_Disposed });

        if (m_InputQueue != null) { m_InputQueue.Clear(); }
        if (m_CustomCollection != null) { m_CustomCollection.Clear(); }

        Console.WriteLine("Worker stopped!");
    }

    public void Enqueue(int item)
    {
        lock (m_SyncRoot_1)
        {
            if (m_InputQueue.Count == m_QueueMaxSize)
            {
                if (!m_Pulsed)
                {
                    Monitor.Pulse(m_SyncRoot_1); // it notifies the consumer...
                    m_Pulsed = true;
                }
                Monitor.Wait(m_SyncRoot_1);  // ... and waits for Pulse
            }

            m_InputQueue.Enqueue(item);
            Console.WriteLine("{0}    \t {1} >", Thread.CurrentThread.Name, item.ToString("+000;-000;"));
        }
    }

    private void Work()
    {
        m_BackupThread.Change(m_BackupPeriod, TimeSpan.FromMilliseconds(-1));

        Queue<int> m_SwapQueueRef, m_WorkerQueue = new Queue<int>();

        Console.WriteLine("Worker started!");
        while (true)
        {
            lock (m_SyncRoot_1)
            {
                if (m_InputQueue.Count < m_QueueMaxSize && !m_Shutdown) Monitor.Wait(m_SyncRoot_1);

                Console.WriteLine("\nswapping...");
                m_SwapQueueRef = m_InputQueue;
                m_InputQueue = m_WorkerQueue;
                m_WorkerQueue = m_SwapQueueRef;

                m_Pulsed = false;
                Monitor.PulseAll(m_SyncRoot_1); // all producers are notified
            }

            Console.WriteLine("Worker\t      < {0}", String.Join(",", m_WorkerQueue.ToArray()));

            lock (m_SyncRoot_2)
            {
                Console.WriteLine("Updating custom dictionary...");
                foreach (int item in m_WorkerQueue)
                {
                    m_CustomCollection.Add(item);
                }
                Thread.Sleep(1000);
                Console.WriteLine("Custom dictionary updated successfully!");
            }

            if (m_Shutdown)
            {
                // schedule last backup
                m_BackupThread.Change(0, Timeout.Infinite);
                return;
            }
            m_WorkerQueue.Clear();
        }
    }

    private void DoBackup(object state)
    {
        try
        {
            lock (m_SyncRoot_2)
            {
                Console.WriteLine("Backup...");
                Thread.Sleep(2000);
                Console.WriteLine("Backup completed at {0}", DateTime.Now);
            }
        }
        finally
        {
            if (m_Shutdown) { m_BackupThread.Dispose(m_Disposed); }
            else { m_BackupThread.Change(m_BackupPeriod, TimeSpan.FromMilliseconds(-1)); }
        }
    }
}

以下のコードに示すように、一部のオブジェクトはメソッドで初期化され、停止後Runに再開できるようになっています。QueueManager

public static void Main(string[] args)
{
    QueueManager queue = new QueueManager();

    var t1 = new Thread(() =>
    {
        for (int i = 0; i < 50; i++)
        {
            queue.Enqueue(i);
            Thread.Sleep(1500);
        }
    }) { Name = "t1" };

    var t2 = new Thread(() =>
    {
        for (int i = 0; i > -30; i--)
        {
            queue.Enqueue(i);
            Thread.Sleep(3000);
        }
    }) { Name = "t2" };

    t1.Start(); t2.Start(); queue.Run();
    t1.Join(); t2.Join(); queue.Shutdown();
    Console.ReadLine();

    var t3 = new Thread(() =>
    {
        for (int i = 0; i < 50; i++)
        {
            queue.Enqueue(i);
            Thread.Sleep(1000);
        }
    }) { Name = "t3" };

    var t4 = new Thread(() =>
    {
        for (int i = 0; i > -30; i--)
        {
            queue.Enqueue(i);
            Thread.Sleep(2000);
        }
    }) { Name = "t4" };

    t3.Start(); t4.Start(); queue.Run();
    t3.Join(); t4.Join(); queue.Shutdown();
    Console.ReadLine();
}
4

1 に答える 1

2

BlockingCollectionをプロデューサー/コンシューマー キューに使用することをお勧めします。その目的のために特別に設計されました。生産者は を使用して項目を追加しAdd、消費者は を使用しますTake。取るアイテムがない場合は、アイテムが追加されるまでブロックされます。マルチスレッド環境で使用するように設計されているため、これらのメソッドを使用するだけであれば、ロックやその他の同期コードを明示的に使用する必要はありません。

于 2012-09-14T01:36:23.480 に答える