7

ブースト ライブラリ (asio) を使用して、UDP ソケットからデータを受信するためだけに専用の自律スレッドを作成したいと考えています。このスレッドは、UDP ソケットから受信したデータによってトリガーされる無限ループである必要があります。私のアプリケーションでは、非同期受信操作を使用する必要があります。

同期関数 receive_from を使用すると、すべてが期待どおりに機能します。

ただし、async_receive_from を使用すると、ハンドラーは呼び出されません。一部のデータが受信されたことを検出するためにセマフォを使用するため、プログラムがロックされ、ループがトリガーされることはありません。

送信側デバイスが UDP ソケットでデータを適切に送信することを (ネットワーク アナライザーで) 確認しました。

次のコードで問題を特定しました。

#include <boost\array.hpp>
#include <boost\asio.hpp>
#include <boost\thread.hpp>
#include <boost\interprocess\sync\interprocess_semaphore.hpp>

#include <iostream>

typedef boost::interprocess::interprocess_semaphore Semaphore;

using namespace boost::asio::ip;

class ReceiveUDP
{
public:

    boost::thread*  m_pThread;

    boost::asio::io_service         m_io_service;
    udp::endpoint                   m_local_endpoint;
    udp::endpoint                   m_sender_endpoint;

    udp::socket                     m_socket;

    size_t      m_read_bytes;
    Semaphore   m_receive_semaphore;

    ReceiveUDP() :
        m_socket(m_io_service),
        m_local_endpoint(boost::asio::ip::address::from_string("192.168.0.254"), 11),
        m_sender_endpoint(boost::asio::ip::address::from_string("192.168.0.11"), 5550),
        m_receive_semaphore(0)
    {
        Start();
    }

    void Start()
    {
        m_pThread = new boost::thread(&ReceiveUDP::_ThreadFunction, this);
    }

    void _HandleReceiveFrom(
        const boost::system::error_code& error,
        size_t                                  received_bytes)
    {
        m_receive_semaphore.post();

        m_read_bytes = received_bytes;
    }

    void _ThreadFunction()
    {
        try
        {
            boost::array<char, 100> recv_buf;

            m_socket.open(udp::v4());
            m_socket.bind(m_local_endpoint);
            m_io_service.run();

            while (1)
            {
#if 1 // THIS WORKS

                m_read_bytes = m_socket.receive_from(
                    boost::asio::buffer(recv_buf), m_sender_endpoint);

#else // THIS DOESN'T WORK

                m_socket.async_receive_from(
                    boost::asio::buffer(recv_buf),
                    m_sender_endpoint,
                    boost::bind(&ReceiveUDP::_HandleReceiveFrom, this,
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));

                /* The program locks on this wait since _HandleReceiveFrom
                is never called. */
                m_receive_semaphore.wait();

#endif

                std::cout.write(recv_buf.data(), m_read_bytes);
            }

            m_socket.close();
        }
        catch (std::exception& e)
        {
            std::cerr << e.what() << std::endl;
        }
    }
};

void main()
{
    ReceiveUDP  receive_thread;

    receive_thread.m_pThread->join();
}

セマフォで timed_wait を使用することをお勧めしますが、デバッグの目的で、上記のコードのようにブロッキング待機を使用しました。

私は何か見落としてますか?私の間違いはどこですか?

4

2 に答える 2

10

io_service.run()が行う作業がないため、への呼び出しは終了io_serviceしています。その後、コードはwhileループに入り、 を呼び出しますm_socket.async_receive_from。この時点でio_serviceは実行されていないため、データを読み取らず、ハンドラを呼び出します。

io_service run を呼び出す前に、実行する作業をスケジュールする必要があります。

すなわち:

// Configure io service
ReceiveUDP  receiver;

m_socket.open(udp::v4());
m_socket.bind(m_local_endpoint);
m_socket.async_receive_from(
    boost::asio::buffer(recv_buf),
    m_sender_endpoint,
    boost::bind(&ReceiveUDP::_HandleReceiveFrom, receiver,
    boost::asio::placeholders::error,
    boost::asio::placeholders::bytes_transferred));

ハンドラー関数は次のことを行います。

// start the io service
void HandleReceiveFrom(
    const boost::system::error_code& error,
    size_t received_bytes)
{
    m_receive_semaphore.post();

    // schedule the next asynchronous read
    m_socket.async_receive_from(
        boost::asio::buffer(recv_buf),
        m_sender_endpoint,
        boost::bind(&ReceiveUDP::_HandleReceiveFrom, receiver,
        boost::asio::placeholders::error,
        boost::asio::placeholders::bytes_transferred));

    m_read_bytes = received_bytes;
}

次に、スレッドは単にセマフォを待ちます。

while (1)
{
    m_receive_semaphore.wait();
    std::cout.write(recv_buf.data(), m_read_bytes);
}

ノート:

  1. この追加スレッドは本当に必要ですか? ハンドラーは完全に非同期であり、boost::asio を使用してスレッド プールを管理できます ( think-asyncを参照) 。
  2. 変数/関数名には、アンダースコアの後に大文字を使用しないでください。それらは予約されています。
于 2012-11-27T09:46:42.990 に答える
0

m_io_service.run()すぐに戻るので、誰も完了ハンドラをディスパッチしません。これio_service::runは、asio ベースのアプリケーションの一種の「メッセージ ループ」であり、asio 機能を利用できる限り実行する必要があることに注意してください (これは少し簡略化された説明ですが、ケースには十分です)。

また、ループ内で async.operation を呼び出さないでください。代わりに、前の完了ハンドラーで後続の async.operation を発行して、2 つの async.reads が同時に実行されないようにします。

asio の例を参照して、典型的な asio アプリケーションの設計を確認してください。

于 2012-11-27T09:50:24.687 に答える