0

メッセージを処理するためにネットワークからメッセージを受信するコンソール アプリケーションを作成しています。最初に、シングルトン クラスを作成して、すべてのクラスから同じキューにアクセスできるようにしました。このクラスはProcessingQueue.

public class ProcessingQueue
{
    public class ItemToProcess
    {
        public string SourceClientId { get; set; }
        public IMessage ReceivedMessage { get; set; }
    }

    private int m_MaxSize = 20;
    private Queue<ItemToProcess> m_InternalQueue;

    private static volatile ProcessingQueue m_Instance = null;
    private static readonly object syncRoot = new object();

    private ProcessingQueue()
    {
        m_InternalQueue = new Queue<ItemToProcess>();
    }

    public static ProcessingQueue Instance
    {
        get
        {
            if (m_Instance == null)
            {
                lock (syncRoot)
                {
                    if (m_Instance == null)
                    {
                        m_Instance = new ProcessingQueue();
                    }
                }
            }
            return m_Instance;
        }
    }

    public int MaxSize
    {
        get
        {
            lock (syncRoot)
            {
                return m_MaxSize;
            }
        }
        set
        {
            if (value > 0)
            {
                lock (syncRoot)
                {
                    m_MaxSize = value;
                }
            }
        }
    }

    public void Enqueue(String source, IMessage message)
    {
        lock (syncRoot)
        {
            while (m_InternalQueue.Count >= m_MaxSize)
            {
                Monitor.Wait(syncRoot);
            }
            m_InternalQueue.Enqueue(new ItemToProcess { SourceClientId = source, ReceivedMessage = message });
            if (m_InternalQueue.Count == 1)
            {
                Monitor.PulseAll(syncRoot);
            }
        }
    }

    public ItemToProcess Dequeue()
    {
        lock (syncRoot)
        {
            while (m_InternalQueue.Count == 0)
            {
                Monitor.Wait(syncRoot);
            }
            ItemToProcess item = m_InternalQueue.Dequeue();
            if (m_InternalQueue.Count == m_MaxSize - 1)
            {
                Monitor.PulseAll(syncRoot);
            }
            return item;
        }
    }

    public int Count
    {
        get
        {
            lock (syncRoot)
            {
                return m_InternalQueue.Count;
            }
        }
    }
}

次に、プロジェクトのメインクラスを次のように実装しました。

  1. まず、共有キューがインスタンス化されます。
  2. 次に、キープアライブ メッセージの到着をシミュレートするタイマーを設定します (最初のプロデューサー)。
  3. 次に、コンシューマー スレッド (processingオブジェクト) を作成しました。
  4. 次に、別のプロデューサー スレッド (generatingオブジェクト) を作成しました。
  5. 最後に、すべてのスレッドとタイマーを実行します。

    クラス プログラム { static ProcessingQueue キュー = ProcessingQueue.Instance; static System.Timers.Timer keep_alive_timer = new System.Timers.Timer(10000);

    private static volatile bool running = true;
    
    
    static void Main(string[] args)
    {
        queue.MaxSize = 30;
        keep_alive_timer.Elapsed += new ElapsedEventHandler(delegate(object sender, ElapsedEventArgs e)
        {
            KeepAliveMessage msg = new KeepAliveMessage(Guid.NewGuid());
            Console.WriteLine("Keep Alive: " + msg.MsgId);
            queue.Enqueue("", msg);
        });
        keep_alive_timer.Enabled = true;
        keep_alive_timer.AutoReset = true;
    
        Thread processing = new Thread(delegate()
        {
            while (running)
            {
                Console.WriteLine("Number of elements in queue: {0}", queue.Count);
    
                ProcessingQueue.ItemToProcess msg = queue.Dequeue();
                Console.WriteLine("Processed: msgid={0}, source={1};", msg.ReceivedMessage.MsgId, msg.SourceClientId);
    
                Thread.Sleep(1500);
            }
        });
    
        Thread generating = new Thread(MessagesFromNetwork);
    
        processing.Start();
        keep_alive_timer.Start();
        generating.Start();
    
        Console.WriteLine("RUNNING...\n");
        Console.ReadLine();
    
        running = false;
        keep_alive_timer.Stop();
        Console.WriteLine("CLOSING...\n");
    
        //processing.Abort();
        //generating.Abort();
    
        bool b1 = processing.Join(TimeSpan.FromSeconds(5));
        bool b2 = generating.Join(TimeSpan.FromSeconds(5));
    
        Console.WriteLine("b1 {0}", b1);
        Console.WriteLine("b2 {0}", b2);
        Console.WriteLine("END");
        Console.ReadLine();
    }
    
    static void MessagesFromNetwork()
    {
        string[] sourceClients = { "1", "2", "3", "4", "5" };
        while (running)
        {
            IMessage msg; // interface IMessage
            Random random = new Random();
            int type = random.Next(2);
            switch (type)
            {
                case 0:
                    msg = new KeepAliveMessage(Guid.NewGuid());   // implements IMessage
                    break;
                case 1:
                    msg = new TaskMessage(Guid.NewGuid(), ...);   // implements IMessage
                    break;
                default:
                    throw new Exception("Messaggio non valido!");
            }
            Console.WriteLine("New message received: " + msg.MsgId);
            queue.Enqueue(sourceClients[random.Next(sourceClients.Length)], msg);
            Console.WriteLine("... message enqueued: " + msg.MsgId);
            Thread.Sleep(500);
        }
    }
    

    }

実行中にEnter キーを押すと、running変数が false になり、両方のスレッドが終了するはずです。ただし、これは常に発生するとは限りません。実際には、2 つのメソッドのうちの 1 つがJoinコントロールを返しませんでした。このため、Joinメソッド内でタイムアウトを指定しましたがConsole.WriteLine("END");、コンソール アプリケーションがフリーズした後 (2 番目のメソッドは をJoin返しますfalse)。

2 番目のスレッドが正しく終了していない可能性があります...なぜですか?

4

1 に答える 1

1

Dequeue または Enqueue が に入る可能性があるようMonitor.Wait()です。実行が停止すると、誰もパルスされません。

5 秒待ちますが、MaxSize * 1500 > 5000 であることに注意してください

タイマーの頻度を直接見つけることはできませんでした。

于 2012-07-01T22:53:39.513 に答える