2

安全な websocket boost::beast 実装を使用しています。現在のメッセージが処理されている間に新しいメッセージを受信できるようにしたいと考えています。そこで、コルーチン (yield を使用したメソッド) を使用して試してみることにしました。

  1. これは私の websocket オブジェクトです:
using Websocket = boost::beast::websocket::stream<
      boost::beast::ssl_stream<boost::beast::tcp_stream>>;

std::optional<Websocket> ws_;
  1. そして、これが私のwebsocketリスナーコードの呼び出し方です
ws_.reset();
boost::asio::spawn(ioc_,
    [=](const boost::asio::yield_context &yield) {
        this->startAndServeWs(yield);
  });
}
  1. そして、これが私の websocket ハンドラー メソッドのしくみです。これは 2 つの部分に分かれていることに注意してください。

    まずは初期化部分。

    次に、新しいメッセージを読み取る準備ができている websocket メインループ。


したがって、私の質問は、現在のメッセージを処理しながら、以下のこのコードがサーバーから新しいメッセージを取得するのに適しているかどうかです ( sendPostFetchItemshttpsendPostDownloadNewVersionポスト要求をトリガーしてサーバーの応答を待つため、時間がかかる場合があります)。そうでない場合は、新しいメッセージがキューに入れられ、現在のメッセージハンドルが完了する次の反復を待っていると想定できますか?

私の 2 番目の質問は、catch ステートメントに関するもので、これが接続を再試行する適切な方法であるかどうかです。

void Comm::startAndServeWs(const boost::asio::yield_context &yield) {
  try {
 
    // webSocet init according to ssl context and io_context.
    ws_.emplace(ioc_, ctx_);

    boost::beast::get_lowest_layer(ws_.value())
        .expires_after(std::chrono::seconds(30));

    boost::asio::ip::tcp::resolver resolver(io_context_);

    auto const results =
        resolver.async_resolve(ip_.value(), port_.value(), yield);

    auto ep = boost::beast::get_lowest_layer(ws_.value())
                  .async_connect(results, yield);

    // Set SNI Hostname (many hosts need this to handshake successfully)
    if (!SSL_set_tlsext_host_name(ws_.value().next_layer().native_handle(),
                                  ip_.value().c_str())) {
      throw("Failed to set SNI Hostname");
    }

    // Update the host_ string. This will provide the value of the
    // Host HTTP header during the WebSocket handshake.
    // Se  e https://tools.ietf.org/html/rfc7230#section-5.4
    auto address = ip_.value() + std::to_string(ep.port());

    // Perform the SSL handshake
    ws_.value().next_layer().handshake(boost::asio::ssl::stream_base::client);

    // Turn off the timeout on the tcp_stream, because
    // the websocket stream has its own timeout system.
    boost::beast::get_lowest_layer(ws_.value()).expires_never();

    // Set suggested timeout settings for the websocket
    ws_.value().set_option(
        boost::beast::websocket::stream_base::timeout::suggested(
            boost::beast::role_type::client));

    // Set a decorator to change the User-Agent of the handshake
    ws_.value().set_option(boost::beast::websocket::stream_base::decorator(
        [](boost::beast::websocket::request_type &req) {
          req.set(boost::beast::http::field::user_agent, kWebsocketIdentifier);
        }));

    Log("trying to establish websocket in address {}", address);
            
    ws_.value().async_handshake(address, "/ws", yield);


//////////////////// here's the websocket main loop.

    for (;;) {
      boost::beast::flat_buffer buffer;
      // Read a message into our buffer
--->  ws_.value().async_read(buffer, yield);
      
      Log("websocket response buffer = {}",boost::beast::make_printable(buffer.data()));

      try {
        nlohmann::json response = nlohmann::json::parse(s);

        if (response["command"] == "fetchItems") {
          sendPostFetchItems();
        } else if (response["command"] == "getLogs") {
          sendPostDownloadNewVersion();
        }... 
    }

  } catch (std::exception &e) {
    Log("websocket reconnect failed. reason = {}", e.what());
    ws_.reset();
    timer_websocket_.expires_at(timer_websocket_.expiry() +
                                boost::asio::chrono::seconds(10));
    timer_websocket_.async_wait(
        [this]([[maybe_unused]] const boost::system::error_code &error) {
          boost::asio::spawn(io_context_,
                             [this](const boost::asio::yield_context &yield) {
                               this->startAndServeWs(yield);
                             });
        });
  }
}
4

0 に答える 0