7

GUI 用のスレッドとソケット IO 用のワーカー スレッドを使用して Boost Asio パターンを実装したいと考えています。

ワーカー スレッドはboost::asio::io_service、ソケット クライアントの管理に使用されます。ソケットに対するすべての操作は、ワーカー スレッドによってのみ実行されます。

GUI スレッドは、ワーカー スレッドからメッセージを送受信する必要があります。

Boost Asio を使用してこのパターンを実装する方法を正確に理解することはできません。

標準の Asio の方法でソケット通信を既に実装しています (io_service.run()ワーカー スレッドから呼び出し、 async_read_some/を使用しますasync_send)。ワーカースレッドからのみ呼び出されるstrandsため、必要ありません。io_service.run()

今、クロス スレッド メッセージ キューを追加しようとしています。どうすれば実装できますか?

GUIスレッドからも取得runする必要がありますか?io_service

またはstrands、withを使用しpostて GUI スレッドからワーカー スレッドにメッセージをポストし (呼び出しio_service.run()io_service.poll_one()GUI スレッドから)、オペレーティング システムの GUI メッセージ ループを使用してワーカー スレッドから GUI スレッドにメッセージをポストする必要がありますか?

GUI スレッドからも呼び出す必要がある場合、2 つのスレッド間で共有されているため、ソケット操作で使用する必要がio_service.run()ありますか?io_service.poll_one()strandsio_service

編集: 私の質問を明確にするために、Boost Asio を使用してメッセージ キューを実装し、Boost Asio が機能しない場合にのみ他のライブラリに依存して、できる限りのことをしたいと思います。

4

3 に答える 3

1

2 つ以上のスレッド間でメッセージを交換する方法は、キューのようなコンテナーを使用してそこに格納し、イベントを使用してワーカー スレッドに通知し、それらを起動して処理することです。以下に例を示します。

void SSLSocket::SendToServer(const int bytesInMsg, Byte* pBuf)
{
   // This method creates a msg object and saves it in the SendMsgQ object.
   //
   Message* pMsg = Message::GetMsg(this, bytesInMsg, pBuf);
   SendMsgQ.Push(pMsg);
   // Signal the send worker thread to wake up and send the msg to the server.
   SetEvent(hEvent);
}

ヘッダー ファイル内:

std::queue<Message*> SendMsgQueue; // Queue of msgs to send to the server.

上記のコードは Microsoft VC++ 用です。開発環境が異なる場合は、別のクラスまたはメソッドを使用する必要がある場合があります。でも、考え方は同じはずです。

編集 - より完全なコード例

#include "StdAfx.h"
#include "SSLSocket.h"

boost::shared_ptr< boost::asio::io_service > SSLSocket::IOService;
bool SSLSocket::LobbySocketOpen = false;
SSLSocket* SSLSocket::pSSLLobby = 0;
int SSLSocket::StaticInit = 0;
Callback SSLSocket::CallbackFunction;
BufferManagement SSLSocket::BufMang;
volatile bool SSLSocket::ReqAlive = true;
Logger SSLSocket::Log;
HANDLE SSLSocket::hEvent;
bool SSLSocket::DisplayInHex;
ConcurrentMsgQueue SSLSocket::SendMsgQ;
bool SSLSocket::RcvThreadCreated = 0;
BufferManagement* Message::pBufMang;
bool SSLSocket::ShuttingDown = false;
std::vector<SSLSocket *> SocketList;

SSLSocket::SSLSocket(const bool logToFile, const bool logToConsole, const bool displayInHex,
   const LogLevel levelOfLog, const string& logFileName, const int bufMangLen) : pSocket(0)
{
   // SSLSocket Constructor.
   // If the static members have not been intialized yet, then initialize them.
   LockCode = new Lock();
   if (!StaticInit)
   {
      SocketList.push_back(this);
      DisplayInHex = displayInHex;
      BufMang.Init(bufMangLen);
      Message::SetBufMang(&BufMang);
      // This constructor enables logging according to the vars passed in.
      Log.Init(logToFile, logToConsole, levelOfLog, logFileName);
      StaticInit = 1;
      hEvent = CreateEvent(NULL, false, false, NULL);
      // Define the ASIO IO service object.
      // IOService = new boost::shared_ptr<boost::asio::io_service>(new boost::asio::io_service);
      boost::shared_ptr<boost::asio::io_service> IOServ(new boost::asio::io_service);
      IOService = IOServ;
      pSSLLobby = this;
   }
}

SSLSocket::~SSLSocket(void)
{
   if (pSocket)
      delete pSocket;
   if (--StaticInit == 0)
      CloseHandle(hEvent);
}

void SSLSocket::Connect(SSLSocket* psSLS, const string& serverPath, string& port)
{
   // Connects to the server.
   // serverPath - specifies the path to the server.  Can be either an ip address or url.
   // port - port server is listening on.
   //
   try
   {
      LockCode->Acquire(); // Single thread the code.
      // If the user has tried to connect before, then make sure everything is clean before trying to do so again.
      if (pSocket)
      {
         delete pSocket;
         pSocket = 0;
      }                                                                                                  
      // If serverPath is a URL, then resolve the address.
      if ((serverPath[0] < '0') || (serverPath[0] > '9')) // Assumes that the first char of the server path is not a number when resolving to an ip addr.
      {
         // Create the resolver and query objects to resolve the host name in serverPath to an ip address.
         boost::asio::ip::tcp::resolver resolver(*IOService);
         boost::asio::ip::tcp::resolver::query query(serverPath, port);
         boost::asio::ip::tcp::resolver::iterator EndpointIterator = resolver.resolve(query);
         // Set up an SSL context.
         boost::asio::ssl::context ctx(*IOService, boost::asio::ssl::context::tlsv1_client);
         // Specify to not verify the server certificiate right now.
         ctx.set_verify_mode(boost::asio::ssl::context::verify_none);
         // Init the socket object used to initially communicate with the server.
         pSocket = new boost::asio::ssl::stream<boost::asio::ip::tcp::socket>(*IOService, ctx);
         //
         // The thread we are on now, is most likely the user interface thread.  Create a thread to handle all incoming socket work messages.
         // Only one thread is created to handle the socket I/O reading and another thread is created to handle writing.
         if (!RcvThreadCreated)
         {
            WorkerThreads.create_thread(boost::bind(&SSLSocket::RcvWorkerThread, this));
            RcvThreadCreated = true;
            WorkerThreads.create_thread(boost::bind(&SSLSocket::SendWorkerThread, this));
         }
         // Try to connect to the server.  Note - add timeout logic at some point.
         boost::asio::async_connect(pSocket->lowest_layer(), EndpointIterator,
            boost::bind(&SSLSocket::HandleConnect, this, boost::asio::placeholders::error));
      }
      else
      {
         // serverPath is an ip address, so try to connect using that.
         //
         stringstream ss1;
         boost::system::error_code EC;
         ss1 << "SSLSocket::Connect: Preparing to connect to game server " << serverPath << " : " << port << ".\n";
         Log.LogString(ss1.str(), LogInfo);
         // Create an endpoint with the specified ip address.
         const boost::asio::ip::address IP(boost::asio::ip::address::from_string(serverPath));
         int iport = atoi(port.c_str());
         const boost::asio::ip::tcp::endpoint EP(IP, iport);
         // Set up an SSL context.
         boost::asio::ssl::context ctx(*IOService, boost::asio::ssl::context::tlsv1_client);
         // Specify to not verify the server certificiate right now.
         ctx.set_verify_mode(boost::asio::ssl::context::verify_none);
         // Init the socket object used to initially communicate with the server.
         pSocket = new boost::asio::ssl::stream<boost::asio::ip::tcp::socket>(*IOService, ctx);
         //
         // Try to connect to the server.  Note - add timeout logic at some point.
         pSocket->next_layer().connect(EP, EC);
         if (EC)
         {
            // Log an error.  This worker thread should exit gracefully after this.
            stringstream ss;
            ss << "SSLSocket::Connect: connect failed to " << sClientIp << " : " << uiClientPort << ".  Error: " << EC.message() + ".\n";
            Log.LogString(ss.str(), LogError);
         }
         stringstream ss;
         ss << "SSLSocket::Connect: Calling HandleConnect for game server " << serverPath << " : " << port << ".\n";
         Log.LogString(ss.str(), LogInfo);
         HandleConnect(EC);
      }
   }
   catch (std::exception& e)
   {
      stringstream ss;
      ss << "SSLSocket::Connect: threw an error - " << e.what() << ".\n";
      Log.LogString(ss.str(), LogError);
      Stop();
   }
   LockCode->Release();
}

void SSLSocket::SendToServer(const int bytesInMsg, Byte* pBuf)
{
   // This method creates a msg object and saves it in the SendMsgQ object.
   // sends the number of bytes specified by bytesInMsg in pBuf to the server.
   //
   Message* pMsg = Message::GetMsg(this, bytesInMsg, pBuf);
   SendMsgQ.Push(pMsg);
   // Signal the send worker thread to wake up and send the msg to the server.
   SetEvent(hEvent);
}


void SSLSocket::SendWorkerThread(SSLSocket* psSLS)
{
   // This thread method gets called to process the messages to be sent to the server.
   //
   // Since this has to be a static method, call a method on the class to handle server requests.
   psSLS->ProcessSendRequests();
}

void SSLSocket::ProcessSendRequests()
{
   // This method handles sending msgs to the server.
   //
   std::stringstream ss;
   DWORD WaitResult;
   Log.LogString("SSLSocket::ProcessSendRequests: Worker thread " + Logger::NumberToString(boost::this_thread::get_id()) + " started.\n", LogInfo);
   // Loop until the user quits, or an error of some sort is thrown.
   try
   {
      do
      {
         // If there are one or more msgs that need to be sent to a server, then send them out.
         if (SendMsgQ.Count() > 0)
         {
            Message* pMsg = SendMsgQ.Front();
            SSLSocket* pSSL = pMsg->pSSL;
            SendMsgQ.Pop();
            const Byte* pBuf = pMsg->pBuf;
            const int BytesInMsg = pMsg->BytesInMsg;
            boost::system::error_code Error;
            LockCode->Acquire(); // Single thread the code.
            try
            {
               boost::asio::async_write(*pSSL->pSocket, boost::asio::buffer(pBuf, BytesInMsg), boost::bind(&SSLSocket::HandleWrite, this,
                  boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
            }
            catch (std::exception& e)
            {
               stringstream ss;
               ss << "SSLSocket::ProcessSendRequests: threw an error - " << e.what() << ".\n";
               Log.LogString(ss.str(), LogError);
            }
            ss.str(std::string());
            ss << "SSLSocket::ProcessSendRequests: # bytes sent = " << BytesInMsg << "\n";
            Log.LogString(ss.str(), LogDebug2);
            Log.LogBuf(pBuf, BytesInMsg, DisplayInHex, LogDebug3);
            LockCode->Release();
         }
         else
         {
            // Nothing to send, so go into a wait state.
            WaitResult = WaitForSingleObject(hEvent, INFINITE);
            if (WaitResult != 0L)
            {
               Log.LogString("SSLSocket::ProcessSendRequests: WaitForSingleObject event error.  Code = " + Logger::NumberToString(GetLastError()) + ". \n", LogError);
            }
         }
      } while (ReqAlive);
      Log.LogString("SSLSocket::ProcessSendRequests: Worker thread " + Logger::NumberToString(boost::this_thread::get_id()) + " done.\n", LogInfo);
   }
   catch (std::exception& e)
   {
      stringstream ss;
      ss << "SSLSocket::ProcessSendRequests: threw an error - " << e.what() << ".\n";
      Log.LogString(ss.str(), LogError);
      Stop();
   }
}

void SSLSocket::HandleWrite(const boost::system::error_code& error, size_t bytesTransferred)
{
   // This method is called after a msg has been written out to the socket.  Nothing to do really since reading is handled by the HandleRead method.
   //
   std::stringstream ss;
   try
   {
      if (error)
      {
         ss << "SSLSocket::HandleWrite: failed - " << error.message() << ".\n";
         Log.LogString(ss.str(), LogError);
         Stop();
      }
   }
   catch (std::exception& e)
   {
      stringstream ss;
      ss << "SSLSocket::HandleHandshake: threw an error - " << e.what() << ".\n";
      Log.LogString(ss.str(), LogError);
      Stop();
   }
}

void SSLSocket::RcvWorkerThread(SSLSocket* psSLS)
{
   // This is the method that gets called when the receive thread is created by this class.
   // This thread method focuses on processing messages received from the server.
   //
   // Since this has to be a static method, call an instance method on the class to handle server requests.
   psSLS->InitAsynchIO();
}

void SSLSocket::InitAsynchIO()
{
   // This method is responsible for initiating asynch i/o.
   boost::system::error_code Err;
   string s;
   stringstream ss;
   //
   try
   {
      ss << "SSLSocket::InitAsynchIO: Worker thread - " << Logger::NumberToString(boost::this_thread::get_id()) << " started.\n";
      Log.LogString(ss.str(), LogInfo);
      // Enable the handlers for asynch i/o.  The thread will hang here until the stop method has been called or an error occurs.
      // Add a work object so the thread will be dedicated to handling asynch i/o.
      boost::asio::io_service::work work(*IOService);
      IOService->run();
      Log.LogString("SSLSocket::InitAsynchIO: receive worker thread done.\n", LogInfo);
   }
   catch (std::exception& e)
   {
      stringstream ss;
      ss << "SSLSocket::InitAsynchIO: threw an error - " << e.what() << ".\n";
      Log.LogString(ss.str(), LogError);
      Stop();
   }
}

void SSLSocket::HandleConnect(const boost::system::error_code& error)
{
   // This method is called asynchronously when the server has responded to the connect request.
   std::stringstream ss;
   try
   {
      if (!error)
      {
         LockCode->Acquire(); // Single thread the code.
         pSocket->async_handshake(boost::asio::ssl::stream_base::client,
            boost::bind(&SSLSocket::HandleHandshake, this, boost::asio::placeholders::error));
         LockCode->Release();
         ss << "SSLSocket::HandleConnect: From worker thread " << Logger::NumberToString(boost::this_thread::get_id()) << ".\n";
         Log.LogString(ss.str(), LogInfo);
      }
      else
      {
         // Log an error.  This worker thread should exit gracefully after this.
         ss << "SSLSocket::HandleConnect: connect failed.  Error: " << error.message() + ".\n";
         Log.LogString(ss.str(), LogError);
         Stop();
      }
   }
   catch (std::exception& e)
   {
      stringstream ss;
      ss << "SSLSocket::InitAsynchIO: threw an error - " << e.what() << ".\n";
      Log.LogString(ss.str(), LogError);
      Stop();
   }
}

void SSLSocket::HandleHandshake(const boost::system::error_code& error)
{
   // This method is called asynchronously when the server has responded to the handshake request.
   std::stringstream ss;
   try
   {
      if (!error)
      {
         // Try to send the first message that the server is expecting.  This msg tells the server we want to connect.
         //
         unsigned char Msg[5] = {0x17, 0x00, 0x00, 0x00, 0x06};
         boost::system::error_code Err;
         //
         if (pSSLLobby == this)
            LobbySocketOpen = true;
         sClientIp = pSocket->lowest_layer().remote_endpoint().address().to_string();
         uiClientPort = pSocket->lowest_layer().remote_endpoint().port();
         ReqAlive = true;
         LockCode->Acquire(); // Single thread the code.
         int Count = boost::asio::write(*pSocket, boost::asio::buffer(Msg), boost::asio::transfer_exactly(5), Err);
         if (Err)
         {
            ss << "SSLSocket::HandleHandshake: write failed - " << error.message() << ".\n";
            Log.LogString(ss.str(), LogInfo);
         }
         HandleFirstWrite(Err, Count);
         LockCode->Release();
         ss.str("");
         ss << "SSLSocket::HandleHandshake: From worker thread " << boost::this_thread::get_id() << ".\n";
      }
      else
      {
         ss << "SSLSocket::HandleHandshake: failed - " << error.message() << ".\n";
         IOService->stop();
      }
      Log.LogString(ss.str(), LogInfo);
   }
   catch (std::exception& e)
   {
      stringstream ss;
      ss << "SSLSocket::HandleHandshake: threw an error - " << e.what() << ".\n";
      Log.LogString(ss.str(), LogError);
      Stop();
   }
}

void SSLSocket::HandleFirstWrite(const boost::system::error_code& error, size_t bytesTransferred)
{
   // This method is called after a msg has been written out to the socket.  This method is only called from HandleHandShake.
   std::stringstream ss;
   try
   {
      if (!error)
      {
         // Notify the UI that we are now connected.  Create a 6 byte msg for this.
         pDataBuf = BufMang.GetPtr(6);
         BYTE* p = pDataBuf;
         // Create msg type 500
         *p = 244;
         *++p = 1;
         CallbackFunction(this, 2, (void*)pDataBuf);
         // Get the 1st 4 bytes of the next msg, which is always the length of the msg.
         pDataBuf = BufMang.GetPtr(MsgLenBytes);
         try
         {
            boost::asio::async_read(*pSocket, boost::asio::buffer(pDataBuf, MsgLenBytes), boost::bind(&SSLSocket::HandleRead, this,
               boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
         }
         catch (std::exception& e)
         {
            stringstream ss;
            ss << "SSLSocket::HandleFirstWrite: threw an error - " << e.what() << ".\n";
            Log.LogString(ss.str(), LogError);
            Stop();
         }
      }
      else
      {
         ss << "SSLSocket::HandleFirstWrite: failed - " << error.message() << ".\n";
         Log.LogString(ss.str(), LogError);
         Stop();
      }
   }
   catch (std::exception& e)
   {
      stringstream ss;
      ss << "SSLSocket::HandleFirstWrite: threw an error - " << e.what() << ".\n";
      Log.LogString(ss.str(), LogError);
      Stop();
   }
}

void SSLSocket::HandleRead(const boost::system::error_code& error, size_t bytesTransferred)
{
   // This method is called to process an incoming message.
   //
   std::stringstream ss;
   int ByteCount;
   try
   {
      // ss << "SSLSocket::HandleRead: From worker thread " << boost::this_thread::get_id() << ".\n";
      // Log.LogString(ss.str(), LogInfo);
      // Set to exit this thread if the user is done.
      if (!ReqAlive)
      {
         // IOService->stop();
         return;
      }
      if (!error)
      {
         // Get the number of bytes in the message.
         if (bytesTransferred == 4)
         {
            ByteCount = BytesToInt(pDataBuf);
         }
         else
         {
            // Call the C# callback method that will handle the message.
            ss << "SSLSocket::HandleRead: From worker thread " << boost::this_thread::get_id() << "; # bytes transferred = " << bytesTransferred << ".\n";
            Log.LogString(ss.str(), LogDebug2);
            if (bytesTransferred > 0)
            {
               Log.LogBuf(pDataBuf, (int)bytesTransferred, true, LogDebug3);
               Log.LogString("SSLSocket::HandleRead: sending msg to the C# client.\n\n", LogDebug2);
               CallbackFunction(this, bytesTransferred, (void*)pDataBuf);
            }
            else
            {
               // # of bytes transferred = 0.  Don't do anything.
               bytesTransferred = 0; // For debugging.
            }
            // Prepare to read in the next message length.
            ByteCount = MsgLenBytes;
         }
         pDataBuf = BufMang.GetPtr(ByteCount);
         boost::system::error_code Err;
         try
         {
            boost::asio::async_read(*pSocket, boost::asio::buffer(pDataBuf, ByteCount), boost::bind(&SSLSocket::HandleRead,
               this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
         }
         catch (std::exception& e)
         {
            stringstream ss;
            ss << "SSLSocket::HandleRead: threw this error - " << e.what() << ".\n";
            Log.LogString(ss.str(), LogError);
         }
      }
      else
      {
         Log.LogString("SSLSocket::HandleRead failed: " + error.message() + "\n", LogError);
         Stop();
      }
   }
   catch (std::exception& e)
   {
      stringstream ss;
      ss << "SSLSocket::HandleRead: threw an error - " << e.what() << ".\n";
      Log.LogString(ss.str(), LogError);
      Stop();
   }
}

void SSLSocket::Stop()
{
   // This method calls the shutdown method on the socket in order to stop reads or writes that might be going on.  If this is not done, then an exception will be thrown
   // when it comes time to delete this object.
   //
   boost::system::error_code EC;
   try
   {
      // This method can be called from the handler as well.  So once the ShuttingDown flag is set, don't go throught the same code again.
      if (ShuttingDown)
         return;
      LockCode->Acquire(); // Single thread the code.
      if (!ShuttingDown)
      {
         ShuttingDown = true;
         pSocket->next_layer().cancel();
         pSocket->shutdown(EC);
         if (EC)
         {
            stringstream ss;
            ss << "SSLSocket::Stop: socket shutdown error - " << EC.message() << ".\n";
         }
         else
         {
            pSocket->next_layer().close();
         }
         delete pSocket;
         pSocket = 0;
         ReqAlive = false;
         SetEvent(hEvent);
         IOService->stop();
         LobbySocketOpen = false;
         WorkerThreads.join_all();
      }
      LockCode->Release();
      delete LockCode;
      LockCode = 0;
   }
   catch (std::exception& e)
   {
      stringstream ss;
      ss << "SSLSocket::Stop: threw an error - " << e.what() << ".\n";
      Log.LogString(ss.str(), LogError);
      Stop();
   }
}

したがって、キューを使用する必要があるかどうかについての質問への回答です。Xaqq へのコメントで、「2 つのスレッド間でメッセージを交換する必要がある」と述べました。したがって、コンテナをキューのように使用すると、メッセージを別のスレッドに渡して処理することができます。STL コンテナーが気に入らない場合、Boost にはいくつかの. 私の知る限り、アクセスできる Boost ASIO 内部コンテナーはありません。メッセージの保存と受け渡しは、コードで行う必要があります。

io_service::run の呼び出しに関する最後の注意事項。やるべき仕事がある間だけブロックします。このリンクを参照してください. 上記のサンプル コードでは、run メソッドが呼び出される前に作業項目が io_service オブジェクトに追加されるため、無期限にブロックされます。これが私が望んでいることです。本当に 1 つのスレッドだけが必要な場合は、作業オブジェクトを使用して run メソッドを呼び出すようにワーカー スレッドを設定して、スレッドが無期限にブロックされるようにすることもできます。これにより、サーバーとの間で送受信されるすべての非同期 I/O が処理されます。クラス内で、GUI がサーバーにデータを送信できるように、インターフェイス メソッドを 1 つまたは 2 つ記述します。これらのメソッドは、非同期書き込み .vs を使用できます。synch write メソッドを使用しているため、すぐに返されるため、GUI が長くブロックされることはありません。HandleWrite メソッドを記述する必要があります。私のコードはそれをあまり処理しません。エラーが発生した場合はログに記録するだけです。

于 2013-07-31T20:34:52.483 に答える
1

ワーカーが 1 つしかない場合は、かなり簡単です。

ASIO のハンドラは、 を呼び出しているスレッドによって実行されますio_service.run()。あなたの場合、それは 1 つのスレッド (ワーカー スレッド) だけがコールバック ハンドラを実行できることを意味します。したがって、ここではスレッド セーフについて心配する必要はありません。

GUI スレッドは、自分のソケットにアクセスできると仮定すると、boost::asio::async_write()問題なく呼び出すことができます。ただし、コールバック ハンドラはワーカー スレッドで実行されます。

私の経験から (確かに限られています)、次のパターンを使用しました。

  1. ビジネス ロジック スレッド (GUI スレッドである可能性があります) は、 を呼び出すことにより、クライアントの 1 つへの書き込みを簡単にスケジュールできますboost::asio::async_write()。ワーカー スレッドが処理します。
  2. ワーカー スレッドはいくつかboost::asio::async_read()の を開始し、「ビジネス ロジック パケット」を構築している可能性があります。ここで私が言いたいのは、生データから意味のあるメッセージ (カスタム クラスのサブクラスである可能Packet性があります) を構築するということです。Event
  3. そのようなメッセージを作成するのに十分なデータがワーカー スレッドにある場合、ワーカー スレッドは作成し、GUI スレッドがプルするスレッド セーフなキューにそのメッセージをエンキューします。
  4. 次に、GUI (またはビジネス ロジック) スレッドがメッセージを処理します。

不明な点がありましたらお知らせください。お役に立てれば幸いです。

于 2013-07-31T17:43:14.867 に答える