ネットワークからパケットを取得し、それらを1つのスレッドでエンキューしてから、このパケットを別のスレッドで消費(デキュー)するだけです。
そこで、ブーストライブラリを使用して https://www.quantnet.com/cplusplus-multithreading-boost/に基づいて共有キューを作成することにしました。
template <typename T>
class SynchronisedQueue
{
private:
std::queue<T> m_queue; // Use STL queue to store data
boost::mutex m_mutex; // The mutex to synchronise on
boost::condition_variable m_cond;// The condition to wait for
public:
// Add data to the queue and notify others
void Enqueue(const T& data)
{
// Acquire lock on the queue
boost::unique_lock<boost::mutex> lock(m_mutex);
// Add the data to the queue
m_queue.push(data);
// Notify others that data is ready
m_cond.notify_one();
} // Lock is automatically released here
// Get data from the queue. Wait for data if not available
T Dequeue()
{
// Acquire lock on the queue
boost::unique_lock<boost::mutex> lock(m_mutex);
// When there is no data, wait till someone fills it.
// Lock is automatically released in the wait and obtained
// again after the wait
while (m_queue.size()==0) m_cond.wait(lock);
// Retrieve the data from the queue
T result=m_queue.front(); m_queue.pop();
return result;
} // Lock is automatically released here
};
問題は、データを取得していないときにDequeue()メソッドがコンシューマースレッドをブロックし、コンシューマースレッドを終了したいときに終了または停止できない場合があることです。
パケットを消費するスレッドを安全に終了できるように、Dequeue()のブロックを終了するための推奨される方法は何ですか?アイデアの提案はありますか?
PS:サイト https://www.quantnet.com/cplusplus-multithreading-boost/ use "boost :: this_thread ::interruption_point();" コンシューマースレッドを停止するために...私のレガシーコード構造のため、これは私には不可能です...
回答に基づいて、共有キューを次のように更新します。
#include <queue>
#include <boost/thread.hpp>
template <typename T>
class SynchronisedQueue
{
public:
SynchronisedQueue()
{
RequestToEnd = false;
EnqueueData = true;
}
void Enqueue(const T& data)
{
boost::unique_lock<boost::mutex> lock(m_mutex);
if(EnqueueData)
{
m_queue.push(data);
m_cond.notify_one();
}
}
bool TryDequeue(T& result)
{
boost::unique_lock<boost::mutex> lock(m_mutex);
while (m_queue.empty() && (! RequestToEnd))
{
m_cond.wait(lock);
}
if( RequestToEnd )
{
DoEndActions();
return false;
}
result= m_queue.front(); m_queue.pop();
return true;
}
void StopQueue()
{
RequestToEnd = true;
Enqueue(NULL);
}
int Size()
{
boost::unique_lock<boost::mutex> lock(m_mutex);
return m_queue.size();
}
private:
void DoEndActions()
{
EnqueueData = false;
while (!m_queue.empty())
{
m_queue.pop();
}
}
std::queue<T> m_queue; // Use STL queue to store data
boost::mutex m_mutex; // The mutex to synchronise on
boost::condition_variable m_cond; // The condition to wait for
bool RequestToEnd;
bool EnqueueData;
};
そして、これが私のテストドライブです:
#include <iostream>
#include <string>
#include "SynchronisedQueue.h"
using namespace std;
SynchronisedQueue<int> MyQueue;
void InsertToQueue()
{
int i= 0;
while(true)
{
MyQueue.Enqueue(++i);
}
}
void ConsumeFromQueue()
{
while(true)
{
int number;
cout << "Now try to dequeue" << endl;
bool success = MyQueue.TryDequeue(number);
if(success)
{
cout << "value is " << number << endl;
}
else
{
cout << " queue is stopped" << endl;
break;
}
}
cout << "Que size is : " << MyQueue.Size() << endl;
}
int main()
{
cout << "Test Started" << endl;
boost::thread startInsertIntoQueue = boost::thread(InsertToQueue);
boost::thread consumeFromQueue = boost::thread(ConsumeFromQueue);
boost::this_thread::sleep(boost::posix_time::seconds(5)); //After 5 seconds
MyQueue.StopQueue();
int endMain;
cin >> endMain;
return 0;
}
今のところそれはうまくいくようです...新しい提案に基づいて:
停止方法を次のように変更します。
void StopQueue()
{
boost::unique_lock<boost::mutex> lock(m_mutex);
RequestToEnd = true;
m_cond.notify_one();
}