5

async_readソケットでの非ブロック読み取りを許可するラッパー同期メソッドを作成しようとしています。インターネットに関するいくつかの例に従って、ほぼ正しいと思われるソリューションを開発しましたが、機能していません。

このクラスは、次の関連する属性とメソッドを宣言します。

class communications_client
{
    protected:
        boost::shared_ptr<boost::asio::io_service> _io_service;
        boost::shared_ptr<boost::asio::ip::tcp::socket> _socket;
        boost::array<boost::uint8_t, 128> _data;

        boost::mutex _mutex;
        bool _timeout_triggered;
        bool _message_received;
        boost::system::error_code _error;
        size_t _bytes_transferred;

        void handle_read(const boost::system::error_code & error, size_t bytes_transferred);
        void handle_timeout(const boost::system::error_code & error);
        size_t async_read_helper(unsigned short bytes_to_transfer, const boost::posix_time::time_duration & timeout, boost::system::error_code & error);

        ...
}

メソッドはすべての複雑さをカプセル化するものでありasync_read_helper、他の 2 つとは単なるイベント ハンドラーです。3 つのメソッドの実装を次に示します。handle_readhandle_timeout

void communications_client::handle_timeout(const boost::system::error_code & error)
{
    if (!error)
    {
        _mutex.lock();
        _timeout_triggered = true;
        _error.assign(boost::system::errc::timed_out, boost::system::system_category());
        _mutex.unlock();
    }
}

void communications_client::handle_read(const boost::system::error_code & error, size_t bytes_transferred)
{
    _mutex.lock();
    _message_received = true;
    _error = error;
    _bytes_transferred = bytes_transferred;
    _mutex.unlock();
}

size_t communications_client::async_read_helper(unsigned short bytes_to_transfer, const boost::posix_time::time_duration & timeout, boost::system::error_code & error)
{
    _timeout_triggered = false;
    _message_received = false;

    boost::asio::deadline_timer timer(*_io_service);
    timer.expires_from_now(timeout);
    timer.async_wait(
        boost::bind(
            &communications_client::handle_timeout,
            this,
            boost::asio::placeholders::error));

    boost::asio::async_read(
        *_socket,
        boost::asio::buffer(_data, 128),
        boost::asio::transfer_exactly(bytes_to_transfer),
        boost::bind(
            &communications_client::handle_read,
            this,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));

    while (true)
    {
        _io_service->poll_one();
        if (_message_received)
        {
            timer.cancel();
            break;
        }
        else if (_timeout_triggered)
        {
            _socket->cancel();
            break;
        }
    }

    return _bytes_transferred;
}

私が持っている主な質問は、なぜこれがループをオンにしてループ_io_service->poll_one()なしでループと呼び出しなしで機能するの_io_service->run_one()ですか? また、Boost と Asio の操作に慣れている人にとっては正しいように見えるかどうかも知りたいです。ありがとうございました!


修正提案 #1

Jonathan Wakelyのコメントによると、ループは、操作が終了した後に_io_service->run_one()への呼び出しを使用して置き換えることができます。_io_service->reset()次のようになります。

_io_service->run_one();
if (_message_received)
{
    timer.cancel();
}
else if (_timeout_triggered)
{
    _socket->cancel();
}

_io_service->reset();

いくつかのテストの後、この種のソリューションだけでは機能しないことを確認しました。メソッドは、handle_timeoutエラー コード で継続的に呼び出されていますoperation_aborted。これらの通話を停止するにはどうすればよいですか?

修正提案 #2

twsansburyによる回答は正確であり、確かなドキュメントに基づいています。その実装により、 内で次のコードが生成されますasync_read_helper

while (_io_service->run_one())
{
    if (_message_received)
    {
        timer.cancel();
    }
    else if (_timeout_triggered)
    {
        _socket->cancel();
    }
}
_io_service->reset();

handle_readメソッドに次の変更を加えます。

void communications_client::handle_read(const boost::system::error_code & error, size_t bytes_transferred)
{
    if (error != boost::asio::error::operation_aborted)
    {
        ...
    }
}

このソリューションは、テスト中に確実で正しいことが証明されました。

4

1 に答える 1

8

io_service::run_one()との主な違いはio_service::poll_one()run_one()ハンドラーを実行する準備ができるまでブロックするのに対し、poll_one()未処理のハンドラーが準備できるまで待機しないことです。

の唯一の未処理のハンドラー_io_servicehandle_timeout()handle_read()であると仮定すると、ループは1回だけ返されるか、実行run_one()されたため、ループは必要ありません。一方、ループが必要なのは、実行する準備ができていないため、すぐに戻るため、関数が最終的に戻る原因になるためです。handle_timeout()handle_read()poll_one()poll_one()handle_timeout()handle_read()

元のコードと修正提案#1の主な問題は、戻ったときにio_serviceに未処理のハンドラーが残っていることasync_read_helper()です。の次の呼び出し時に、async_read_helper()呼び出される次のハンドラーは、前の呼び出しのハンドラーになります。このio_service::reset()メソッドは、io_serviceが停止状態から実行を再開することのみを許可し、io_serviceにすでにキューに入れられているハンドラーを削除しません。この動作を説明するには、ループを使用してio_serviceのすべてのハンドラーを使用してみてください。すべてのハンドラーが消費されたら、ループを終了してio_serviceをリセットします。

// Consume all handlers.
while (_io_service->run_one())
{
  if (_message_received)
  {
    // Message received, so cancel the timer.  This will force the completion of
    // handle_timer, with boost::asio::error::operation_aborted as the error.
    timer.cancel();
  }
  else if (_timeout_triggered)
  {
    // Timeout occured, so cancel the socket.  This will force the completion of
    // handle_read, with boost::asio::error::operation_aborted as the error.
    _socket->cancel();
  }
}

// Reset service, guaranteeing it is in a good state for subsequent runs.
_io_service->reset();

呼び出し元の観点からは、この形式のタイムアウトはrun_one()ブロックとして同期されます。ただし、I/Oサービス内での作業はまだ行われています。別の方法は、Boost.AsioのC ++先物のサポートを使用して、先物を待機し、タイムアウトを実行することです。このコードは読みやすくなりますが、タイムアウトを待機しているスレッドがI / Oサービスを処理しなくなったため、I/Oサービスを処理するために少なくとも1つの他のスレッドが必要です。

// Use an asynchronous operation so that it can be cancelled on timeout.
std::future<std::size_t> read_result = boost::asio::async_read(
    socket, buffer, boost::asio::use_future);

// If timeout occurs, then cancel the operation.
if (read_result.wait_for(std::chrono::seconds(1)) == 
    std::future_status::timeout)
{
  socket.cancel();
}
// Otherwise, the operation completed (with success or error).
else
{
  // If the operation failed, then on_read.get() will throw a
  // boost::system::system_error.
  auto bytes_transferred = read_result.get();
  // process buffer
}
于 2012-06-03T05:09:09.220 に答える