2

私は2つのスレッドを実行しているc#フォームを持っています.1つのスレッドはデータが入ってくるのをリッスンし、もう1つのスレッドはデータを処理しているので、それを使用できます. 何らかの理由で、プロセス スレッドが開始されると、リッスン スレッドは実行されなくなります。

Thread th1 = new Thread(new ThreadStart(zeroMQConn.Listen));
th1.Start();
Thread th2 = new Thread(() => ProcessData(zeroMQConn));
th2.Start();

これをデバッグすると、th1が開始され、次にth2が開始され、th1に戻ることはなく、データがnullに戻ります。

public void Listen()
    {
        while (true)
        {
            try
            {
                byte[] zmqBuffer = new byte[102400];
                int messageLength;
                lockForZMQ.EnterWriteLock();
                messageLength = socket.Receive(zmqBuffer);
                lockForZMQ.ExitWriteLock();
                byte[] message = new byte[messageLength];
                Buffer.BlockCopy(zmqBuffer, 0, message, 0, messageLength);
                PriceBookData priceBook = PriceBookData.CreateBuilder().MergeFrom(message).Build();
                double Type = priceBook.GetPb(0).QuoteType;
                if (Type == 0.0)
                {
                    lockForList.EnterWriteLock();
                    CachedBidBooks = priceBook;
                    lockForList.ExitWriteLock();
                }
                else
                {
                    lockForList.EnterWriteLock();
                    CachedAskBooks = priceBook;
                    lockForList.ExitWriteLock();
                }
            }
            catch (ZmqException ex)
            {
                MessageBox.Show(ex.Message);
            }
        }
    }

 public void ProcessData(object connection)
    {
        while (true)
        {
            priceBookData = ((ZeroMQClass)connection).GetPriceBook();
        }

    }

 public List<PriceBookData> GetPriceBook()
    {
        List<PriceBookData> AskAndBid = new List<PriceBookData>();
        lockForList.EnterWriteLock();
        if (CachedAskBooks != null && CachedBidBooks != null)
        {
            AskAndBid.Add(CachedBidBooks);
            AskAndBid.Add(CachedAskBooks);
            CachedBidBooks = null;
            CachedAskBooks = null;
            lockForList.ExitWriteLock();
            return AskAndBid;
        }
        lockForList.ExitWriteLock();
        return null;
    }
4

1 に答える 1

4

ここにあるのは生産者と消費者のモデルですが、それらを適切に同期していません。問題は、処理の準備が整ったある種のバッファーまたはデータのコレクションではなく、単一の変数があり、その変数へのアクセスを完全に同期することです。これは、消費者が作業している間は生産者が作業できないことを意味します。

これBlockingCollection<T>は、プロデューサー/コンシューマー キューを扱うときはいつでも素晴らしいクラスです。

var queue = new BlockingCollection<PriceBookData>();

Task.Factory.StartNew(() =>
{
    while (true)
    {
        byte[] zmqBuffer = new byte[102400];
        int messageLength;
        socket.Receive(zmqBuffer);
        byte[] message = new byte[messageLength];
        Buffer.BlockCopy(zmqBuffer, 0, message, 0, messageLength);
        PriceBookData priceBook = PriceBookData.CreateBuilder().MergeFrom(message).Build();
        double Type = priceBook.GetPb(0).QuoteType;
        queue.Add(priceBook);
    }
}, TaskCreationOptions.LongRunning);

Task.Factory.StartNew(() =>
{
    foreach (var item in queue.GetConsumingEnumerable())
    {
        //do stuff with item
    }
}, TaskCreationOptions.LongRunning);
于 2013-03-18T16:35:08.573 に答える