2

複数の io_services で boost:asio を使用して、さまざまな形式のブロッキング I/O を分離しています。たとえば、ファイル I/O をブロックするための io_service と、長時間実行される CPU バウンド タスク用の io_service があります (ネットワーク I/O をブロックするために、これを 3 分の 1 に拡張することもできます)。 I/O をブロックしても、他の I/O が不足することはありません。

私が抱えている問題は、ある io_service で実行されているタスクがイベントを他の io_service に投稿できるためです (たとえば、CPU にバインドされたタスクがファイル I/O 操作を開始する必要があるか、完了したファイル I/O 操作が CPU を呼び出す可能性があります)。バインドされたコールバック)、両方の io_services がイベントを終了するまで、両方の io_services を実行し続ける方法がわかりません。

通常、単一の I/O サービスでは、次のようにします。

 shared_ptr<asio::io_service> io_service (new asio::io_service);
 shared_ptr<asio::io_service::work> work (
   new asio::io_service::work(*io_service));

 // Create worker thread(s) that call io_service->run()

 io_service->post(/* some event */);

 work.reset();

 // Join worker thread(s)

ただし、単純に両方の io_services に対してこれを行うと、最初のイベントを投稿しなかった io_services がすぐに終了します。また、最初のイベントを両方に投稿したとしても、io_service A のタスクが B に新しいイベントを投稿する前に io_service B の最初のイベントが終了すると、io_service B は途中で終了します。

io_service A がまだイベントを処理している間 (サービス A のキューに入れられたイベントの 1 つが B に新しいイベントを投稿する可能性があるため)、io_service B を実行し続けるにはどうすればよいですか? また、その逆も同様です。両方が同時にイベントから外れている場合はどうなりますか?

4

2 に答える 2

3

これを行う方法を考え出したので、他の誰かが検索でこの質問を見つけた場合に備えて、記録のためにそれを文書化します。

  • N個の相互通信io_servicesを作成し、それぞれの作業オブジェクトを作成してから、ワーカースレッドを開始します。

  • ワーカースレッドを実行しない「マスター」io_serviceオブジェクトを作成します。

  • イベントをサービスに直接投稿することを許可しないでください。代わりに、io_servicesへのアクセサ関数を作成します。

    1. マスタースレッドで作業オブジェクトを作成します。
    2. 実際のコールバックを実行する関数でコールバックをラップしてから、作業を削除します。
    3. 代わりに、このラップされたコールバックを投稿してください。
  • メインの実行フローでは、すべてのN io_servicesが開始され、少なくとも1つに作業が投稿されたら、マスターio_serviceでrun()を呼び出します。

  • マスターio_serviceのrun()メソッドが戻ったら、N個の相互通信するio_servicesの初期作業をすべて削除し、すべてのワーカースレッドに参加します。

マスターio_serviceのスレッドが他の各io_serviceで動作するようにすることで、マスターio_serviceの動作がなくなるまでそれらが終了しないようにします。他の各io_servicesが、投稿されたコールバックごとにマスターio_serviceで作業するようにすることで、他のすべてのio_servicesが投稿されたコールバックの処理を終了するまで、マスターio_serviceの作業が不足しないようにします。

例(クラスにカプセル化できます):

shared_ptr<boost::asio::io_service> master_io_service;

void RunWorker(boost::shared_ptr<boost::asio::io_service> io_service) {
  io_service->run();
}

void RunCallbackAndDeleteWork(boost::function<void()> callback,
                              boost::asio::io_service::work* work) {
  callback();
  delete work;
}

// All new posted callbacks must come through here, rather than being posted
// directly to the io_service object.
void PostToService(boost::shared_ptr<boost::asio::io_service> io_service,
                   boost::function<void()> callback) {
  io_service->post(boost::bind(
      &RunCallbackAndDeleteWork, callback,
      new boost::asio::io_service::work(*master_io_service)));
}

int main() {
  vector<boost::shared_ptr<boost::asio::io_service> > io_services;
  vector<boost::shared_ptr<boost::asio::io_service::work> > initial_work;
  boost::thread_pool worker_threads;

  master_io_service.reset(new boost::asio::io_service);

  const int kNumServices = X;
  const int kNumWorkersPerService = Y;
  for (int i = 0; i < kNumServices; ++i) {
    shared_ptr<boost::asio::io_service> io_service(new boost::asio::io_service);
    io_services.push_back(io_service);
    initial_work.push_back(new boost::asio::io_service::work(*io_service));

    for (int j = 0; j < kNumWorkersPerService; ++j) {
      worker_threads.create_thread(boost::bind(&RunWorker, io_service));
    }
  }

  // Use PostToService to start initial task(s) on at least one of the services

  master_io_service->run();

  // At this point, there is no real work left in the services, only the work
  // objects in the initial_work vector.
  initial_work.clear();
  worker_threads.join_all();
  return 0;
}
于 2013-03-08T18:31:04.843 に答える
2

HTTP サーバーの例 2は、役に立つと思われる同様のことを行います。とfor eachをio_service保持するプールの概念を使用します。スレッド プールを使用して各サービスを実行します。vectorshared_ptr<boost::asio::io_service>shared_ptr<boost::asio::io_service::work>io_service

io_serviceこの例では、I/O サービスに作業を分配するためにラウンド ロビン スケジューリングを使用していますが、 A とio_serviceBに特定のタスクがあるため、このケースには当てはまらないと思います。

于 2013-03-07T00:07:12.857 に答える