16

私の問題は次のとおりです。いくつかの操作を非同期で開始しましたが、すべてが完了するまで続行したいと考えています。Boost Asio を使用して、これを行う最も簡単な方法は次のとおりです。何らかのtasks非同期操作をサポートするオブジェクトのコンテナのようなものであるとします。

tasksToGo = tasks.size();
for (auto task: tasks) {
    task.async_do_something([](const boost::system::error_code& ec)
    {
        if (ec) {
            // handle error
        } else {
            if (--taslsToGo == 0) {
                 tasksFinished();
            }
        }
    });
}

このソリューションの問題は、回避策のように感じられることです。Boost 1.54では先物でそれを行うことができますが、同期的にしか待つことができません。これは、run()呼び出された場所とは別のスレッドからのみ可能です。

for (auto task: tasks) {
    futures.push_back(task.async_do_something(boost::asio::use_future));
}

for (auto future: futures) {
    future.wait();
}

このコードは前のものよりもはるかに明確ですが、別のスレッドが必要ですが、これは望ましくありません。次のように使用できるものが欲しい:

for (auto task: tasks) {
    futures.push_back(task.async_do_something(boost::asio::use_future));
}

boost::asio::spawn(ioService, [](boost::asio::yield_context yield)
{
    for (auto future: futures) {
        future.async_wait(yield);
    }
    tasksFinished();

}

同じように使えるものはありますか?

4

2 に答える 2

20

私の知る限り、現在これに対する一流のサポートはありません。ただし、ライブラリの方向性を考えると、この機能が将来利用できなくなったら驚くでしょう。

このタイプの機能のサポートを追加するために、いくつかの論文が提案されています。

  • N3558 - 非同期操作の標準化された表現は特に興味深いものです。と提案when_all(futures)future.next()ます。実装されている場合、非同期チェーンを次のように表すことができます。

    for (auto task: tasks) {
        futures.push_back(task.async_do_something(boost::asio::use_future));
    }
    when_all(futures).then(&tasksFinished);
    
  • N3562 - エグゼキュータとスケジューラは、エグゼキュータを紹介します。asyncこれを使用して、を実行できるコンテキストをより細かく制御できます。Boost.Asio の場合、これには、io_service.

これらの論文はまだ進行中ですが、Boost.Thread のConformance and Extensionページと Boost.Asio のgithubを定期的にチェックして、これらの提案の早期適応を確認することをお勧めします。


1 年前のバージョンの Boost でこの機能が必要だったので、独自のソリューションを開発しました。セマンティクスに関してはまだ大雑把な部分もありますが、正式なものが採用されるまでの参考資料として参考になるかもしれません。コードを提供する前に、質問に基づいたアプリケーションの例を次に示します。

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include "async_ops.hpp"

void handle_timer(const boost::system::error_code& error, int x)
{
  std::cout << "in handle timer: " << x << " : "
            << error.message() <<  std::endl;
}

void a() { std::cout << "a" << std::endl; }
void b() { std::cout << "b" << std::endl; }
void c() { std::cout << "c" << std::endl; }

int main()
{
  boost::asio::io_service io_service;
  boost::asio::deadline_timer timer1(io_service);
  boost::asio::deadline_timer timer2(io_service);

  // Create a chain that will continue once 2 handlers have been executed.
  chain all_expired = when_all(io_service, 2);

  all_expired.then(&a)  // Once 2 handlers finish, run a within io_service.
             .then(&b)  // Once a has finished, run b within io_service.
             .then(&c); // Once b has finished, run c within io_service.

  // Set expiration times for timers.
  timer1.expires_from_now(boost::posix_time::seconds(2));
  timer2.expires_from_now(boost::posix_time::seconds(5));

  // Asynchrnously wait for the timers, wrapping the handlers with the chain.
  timer1.async_wait(all_expired.wrap(
      boost::bind(&handle_timer, boost::asio::placeholders::error, 1)));
  timer2.async_wait(all_expired.wrap(
      boost::bind(&handle_timer, boost::asio::placeholders::error, 2)));

  // Run the io_service.
  io_service.run();
}

次の出力が生成されます。

in handle timer: 1 : Success
in handle timer: 2 : Success
a
b
c

そしてここにあるasync_ops.hpp

#include <vector>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/bind/protect.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/foreach.hpp>
#include <boost/function.hpp>
#include <boost/make_shared.hpp>
#include <boost/range/iterator_range.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/type_traits/is_integral.hpp>
#include <boost/type_traits/remove_reference.hpp>
#include <boost/utility/enable_if.hpp>

class chain;

namespace detail {

/// @brief Chained handler connects two handlers together that will
///        be called sequentially.
///
/// @note Type erasure is not performed on Handler1 to allow resolving
///       to the correct asio_handler_invoke via ADL.
template <typename Handler1> 
class chained_handler
{
public:
  template <typename Handler2>
  chained_handler(Handler1 handler1, Handler2 handler2)
    : handler1_(handler1),
      handler2_(handler2)
  {}

  void operator()()
  {
    handler1_();
    handler2_();
  }

  template <typename Arg1>
  void operator()(const Arg1& a1)
  {
    handler1_(a1);
    handler2_();
  }

  template <typename Arg1, typename Arg2>
  void operator()(const Arg1& a1, const Arg2& a2)
  {
    handler1_(a1, a2);
    handler2_();
  }

//private:
  Handler1 handler1_;
  boost::function<void()> handler2_;
};

/// @brief Hook that allows the sequential_handler to be invoked
///        within specific context based on the hander's type.
template <typename Function,
          typename Handler>
void asio_handler_invoke(
  Function function,
  chained_handler<Handler>* handler)
{
  boost_asio_handler_invoke_helpers::invoke(
    function, handler->handler1_);
}

/// @brief No operation.
void noop() {}

/// @brief io_service_executor is used to wrap handlers, providing a
///        deferred posting to an io_service.  This allows for chains
///        to inherit io_services from other chains.
class io_service_executor
  : public boost::enable_shared_from_this<io_service_executor>
{
public:
  /// @brief Constructor.
  explicit
  io_service_executor(boost::asio::io_service* io_service)
    : io_service_(io_service)
  {}

  /// @brief Wrap a handler, returning a functor that will post the
  ///        provided handler into the io_service.
  ///
  /// @param handler Handler to be wrapped for deferred posting.
  /// @return Functor that will post handler into io_service.
  template <typename Handler>
  boost::function<void()> wrap(Handler handler)
  {
    // By binding to the io_service_exectuer's post, the io_service
    // into which the handler can be posted can be specified at a later 
    // point in time.
    return boost::bind(&io_service_executor::post<Handler>,
                       shared_from_this(), handler);
  }

  /// @brief Set the io_service.
  void io_service(boost::asio::io_service* io_service)
  {
    io_service_ = io_service;
  }

  /// @brief Get the io_service.
  boost::asio::io_service* io_service()
  {
    return io_service_;
  }

private:

  /// @brief Post handler into the io_service.
  ///
  /// @param handler The handler to post.
  template <typename Handler>
  void post(Handler handler)
  {
    io_service_->post(handler);
  }

private:
  boost::asio::io_service* io_service_;
};

/// @brief chain_impl is an implementation for a chain.  It is responsible
///        for lifetime management, tracking posting and wrapped functions,
///        as well as determining when run criteria has been satisfied.
class chain_impl
  : public boost::enable_shared_from_this<chain_impl>
{
public:

  /// @brief Constructor.
  chain_impl(boost::shared_ptr<io_service_executor> executor,
             std::size_t required)
    : executor_(executor),
      required_(required)
  {}

  /// @brief Destructor will invoke all posted handlers.
  ~chain_impl()
  {
    run();
  }

  /// @brief Post a handler that will be posted into the executor 
  ///        after run criteria has been satisfied.
  template <typename Handler>
  void post(const Handler& handler)
  {
    deferred_handlers_.push_back(executor_->wrap(handler));
  }

  /// @brief Wrap a handler, returning a chained_handler.  The returned
  ///        handler will notify the impl when it has been invoked.
  template <typename Handler>
  chained_handler<Handler> wrap(const Handler& handler)
  {
    return chained_handler<Handler>(
      handler,                                                 // handler1
      boost::bind(&chain_impl::complete, shared_from_this())); // handler2
  }

  /// @brief Force run of posted handlers.
  void run()
  {
    boost::unique_lock<boost::mutex> guard(mutex_);
    run(guard);
  }

  /// @brief Get the executor.
  boost::shared_ptr<io_service_executor> executor() { return executor_; }

private:

  /// @brief Completion handler invoked when a wrapped handler has been
  ///        invoked.
  void complete()
  {
    boost::unique_lock<boost::mutex> guard(mutex_);

    // Update tracking.
    if (required_)
      --required_;

    // If criteria has not been met, then return early.
    if (required_) return;

    // Otherwise, run the handlers.
    run(guard);    
  }

  /// @brief Run handlers.
  void run(boost::unique_lock<boost::mutex>& guard)
  {
    // While locked, swap handlers into a temporary.
    std::vector<boost::function<void()> > handlers;
    using std::swap;
    swap(handlers, deferred_handlers_);

    // Run handlers without mutex.
    guard.unlock();
    BOOST_FOREACH(boost::function<void()>& handler, handlers)
        handler();
    guard.lock();
  }

private:
  boost::shared_ptr<io_service_executor> executor_;
  boost::mutex mutex_;
  std::size_t required_;
  std::vector<boost::function<void()> > deferred_handlers_;
};

/// @brief Functor used to wrap and post handlers or chains between two
///        implementations.
struct wrap_and_post
{
  wrap_and_post(
    boost::shared_ptr<detail::chain_impl> current,
    boost::shared_ptr<detail::chain_impl> next
  )
    : current_(current),
      next_(next)
  {}

  /// @brief Wrap a handler with next, then post into current.
  template <typename Handler>
  void operator()(Handler handler)
  {
    // Wrap the handler with the next implementation, then post into the
    // current.  The wrapped handler will keep next alive, and posting into
    // current will cause next::complete to be invoked when current is ran.
    current_->post(next_->wrap(handler));
  }

  /// @brief Wrap an entire chain, posting into the current.
  void operator()(chain chain);

private:
  boost::shared_ptr<detail::chain_impl> current_;
  boost::shared_ptr<detail::chain_impl> next_;
};

} // namespace detail

/// @brief Used to indicate that the a chain will inherit its service from an
///        outer chain.
class inherit_service_type {};
inherit_service_type inherit_service;

/// @brief Chain represents an asynchronous call chain, allowing the overall
///        chain to be constructed in a verbose and explicit manner.
class chain
{
public:

  /// @brief Constructor.
  ///
  /// @param io_service The io_service in which the chain will run.
  explicit
  chain(boost::asio::io_service& io_service)
    : impl_(boost::make_shared<detail::chain_impl>(
              boost::make_shared<detail::io_service_executor>(&io_service),
              0)),
      root_impl_(impl_)
  {}

  /// @brief Constructor.  The chain will inherit its io_service from an
  ///        outer chain.
  explicit
  chain(inherit_service_type)
    : impl_(boost::make_shared<detail::chain_impl>(
              boost::make_shared<detail::io_service_executor>(
                static_cast<boost::asio::io_service*>(NULL)),
              0)),
      root_impl_(impl_)
  {}

  /// @brief Force run posted handlers.
  void run()
  {
    root_impl_->run();
  }

  /// @brief Chain link that will complete when the amount of wrapped
  ///        handlers is equal to required.
  ///
  /// @param required The amount of handlers required to be complete.
  template <typename T>
  typename boost::enable_if<boost::is_integral<
    typename boost::remove_reference<T>::type>, chain>::type
  any(std::size_t required = 1)
  {
    return chain(root_impl_, required);
  }

  /// @brief Chain link that wraps all handlers in container, and will
  ///        be complete when the amount of wrapped handlers is equal to
  ///        required.
  ///
  /// @param Container of handlers to wrap.
  /// @param required The amount of handlers required to be complete.
  template <typename Container>
  typename boost::disable_if<boost::is_integral<
    typename boost::remove_reference<Container>::type>, chain>::type
  any(const Container& container, 
      std::size_t required = 1)
  {
    return post(container, required);
  }

  /// @brief Chain link that wraps all handlers in iterator range, and will
  ///        be complete when the amount of wrapped handlers is equal to
  ///        required.
  ///
  /// @param Container of handlers to wrap.
  /// @param required The amount of handlers required to be complete.
  template <typename Iterator>
  chain any(Iterator begin, Iterator end,
            std::size_t required = 1)
  {
    return any(boost::make_iterator_range(begin, end), required);
  }

  /// @brief Chain link that will complete when the amount of wrapped
  ///        handlers is equal to required.
  ///
  /// @param required The amount of handlers required to be complete.
  template <typename T>
  typename boost::enable_if<boost::is_integral<
    typename boost::remove_reference<T>::type>, chain>::type
  all(T required)
  {
    return any<T>(required);
  }

  /// @brief Chain link that wraps all handlers in container, and will
  ///        be complete when all wrapped handlers from the container 
  ///        have been executed.
  ///
  /// @param Container of handlers to wrap.
  template <typename Container>
  typename boost::disable_if<boost::is_integral<
    typename boost::remove_reference<Container>::type>, chain>::type
  all(const Container& container)
  {
    return any(container, container.size());
  }

  /// @brief Chain link that wraps all handlers in iterator range, and will
  ///        be complete when all wrapped handlers from the iterator range
  ///        have been executed.
  ///
  /// @param Container of handlers to wrap.
  template <typename Iterator>
  chain all(Iterator begin, Iterator end)
  {
    return all(boost::make_iterator_range(begin, end));
  }

  /// @brief Chain link that represents a single sequential link.
  template <typename Handler>
  chain then(const Handler& handler)
  {
    boost::array<Handler, 1> handlers = {{handler}};
    return all(handlers);
  }

  /// @brief Wrap a handler, returning a chained_handler.
  template <typename Handler>
  detail::chained_handler<Handler> wrap(const Handler& handler)
  {
    return impl_->wrap(handler);
  }

  /// @brief Set the executor.
  void executor(boost::asio::io_service& io_service)
  {
    impl_->executor()->io_service(&io_service);
  }

  /// @brief Check if this chain should inherit its executor.
  bool inherits_executor()
  {
    return !impl_->executor()->io_service();
  }

private:

  /// @brief Private constructor used to create links in the chain.
  ///
  /// @note All links maintain a handle to the root impl.  When constructing a
  ///       chain, this allows for links later in the chain to be stored as
  ///       non-temporaries.
  chain(boost::shared_ptr<detail::chain_impl> root_impl,
        std::size_t required)
    : impl_(boost::make_shared<detail::chain_impl>(
              root_impl->executor(), required)),
      root_impl_(root_impl)
  {}

  /// @brief Create a new chain link, wrapping handlers and posting into
  ///        the current chain.
  template <typename Container>
  chain post(const Container& container,
             std::size_t required)
  {
    // Create next chain.
    chain next(root_impl_, required);

    // Wrap handlers from the next chain, and post into the current chain.
    std::for_each(container.begin(), container.end(),
                  detail::wrap_and_post(impl_, next.impl_));

    return next;
  }

private:
  boost::shared_ptr<detail::chain_impl> impl_;
  boost::shared_ptr<detail::chain_impl> root_impl_;
};

void detail::wrap_and_post::operator()(chain c)
{
  // If next does not have an executor, then inherit from current.
  if (c.inherits_executor())
      c.executor(*current_->executor()->io_service());

  // When current completes, start the chain.
  current_->post(boost::protect(boost::bind(&chain::run, c)));

  // The next impl needs to be aware of when the chain stops, so
  // wrap a noop and append it to the end of the chain.
  c.then(next_->wrap(&detail::noop));  
}

// Convenience functions.
template <typename T, typename Handler>
chain async(T& t, const Handler& handler)
{
  return chain(t).then(handler);
}

template <typename T,
          typename Container>
chain when_all(T& t, const Container& container)
{
  return chain(t).all(container);
}

template <typename T,
          typename Iterator>
chain when_all(T& t, Iterator begin, Iterator end)
{
  return chain(t).all(begin, end);
}

template <typename T,
          typename Container>
chain when_any(T& t, const Container& container)
{
  return chain(t).any(container);
}

template <typename T,
          typename Iterator>
chain when_any(T& t, Iterator begin, Iterator end)
{
  return chain(t).any(begin, end);
}

上記のコードを 2 つのスレッドで使用した基本的な高度な例を次に示します。私の表記:

  • a -> ba次に表現するb
  • (a | b)aまたはを表しbます。したがって(a | b) -> c、いつaまたはb終了するかを意味し、次に実行しcます。
  • (a & b)と を表現abます。したがって、との両方が終了したら、 を実行(a & b) -> cすることを意味します。abc

各ケースの前に、チェーンの表記を印刷します。さらに、各関数は、開始時に大文字を出力し、終了時に小文字を出力します。

#include <iostream>
#include <boost/asio.hpp>
#include <boost/assign.hpp>
#include <boost/thread.hpp>
#include "async_ops.hpp"

/// @brief Print identifiers when entering and exiting scope,
///        sleeping between.
void print_and_sleep(char id, unsigned int sleep_time)
{
  std::cout << char(toupper(id));
  boost::this_thread::sleep_for(boost::chrono::milliseconds(sleep_time));
  std::cout << char(tolower(id));
  std::cout.flush();
}

/// @brief Convenience function to create functors.
boost::function<void()> make_fn(char id, unsigned int sleep_time)
{
  return boost::bind(&print_and_sleep, id, sleep_time);  
}

/// @brief Run an io_service with multiple threads.
void run_service(boost::asio::io_service& io_service)
{
  boost::thread_group threads;
  threads.create_thread(boost::bind(
    &boost::asio::io_service::run, &io_service));
  io_service.run();
  threads.join_all();
}

int main()
{
  boost::function<void()> a = make_fn('a', 500);
  boost::function<void()> b = make_fn('b', 1000);
  boost::function<void()> c = make_fn('c', 500);
  boost::function<void()> d = make_fn('d', 1000);
  boost::function<void()> e = make_fn('e', 500);

  {
    std::cout << "a -> b -> c\n"
                 "  ";
    boost::asio::io_service io_service;
    async(io_service, a)
      .then(b)
      .then(c);
    run_service(io_service);
    std::cout << std::endl;
  }

  {
    std::cout << "(a & b) -> c\n"
                 "  ";
    boost::asio::io_service io_service;
    when_all(io_service, boost::assign::list_of(a)(b))
      .then(c); 
    run_service(io_service);
    std::cout << std::endl;
  }

  {
    std::cout << "(a | b) -> c\n"
                 "  ";
    boost::asio::io_service io_service;
    when_any(io_service, boost::assign::list_of(a)(b))
      .then(c); 
    run_service(io_service);
    std::cout << std::endl;
  }

  {
    std::cout << "(a & b) -> (c & d)\n"
                 "  ";
    boost::asio::io_service io_service;
    when_all(io_service, boost::assign::list_of(a)(b))
      .all(boost::assign::list_of(c)(d));
    run_service(io_service);
    std::cout << std::endl;
  }

  {
    std::cout << "(a & b) -> c -> (d & e)\n"
                 "  ";
    boost::asio::io_service io_service;
    when_all(io_service, boost::assign::list_of(a)(b))
      .then(c)
      .all(boost::assign::list_of(d)(e));
    run_service(io_service);
    std::cout << std::endl;
  }

  std::cout << "(a & b) -> (c & d) -> e\n"
               "  ";
  {
    boost::asio::io_service io_service;
    when_all(io_service, boost::assign::list_of(a)(b))
      .all(boost::assign::list_of(c)(d))
      .then(e);
    run_service(io_service);
    std::cout << std::endl;
  }

  std::cout << "(a | b) -> (c | d) -> e\n"
               "  ";
  {
    boost::asio::io_service io_service;
    when_any(io_service, boost::assign::list_of(a)(b))
      .any(boost::assign::list_of(c)(d))
      .then(e);
    run_service(io_service);
    std::cout << std::endl;
  }

  std::cout << "(a | b) -> (c & d) -> e\n"
               "  ";
  {
    boost::asio::io_service io_service;
    when_any(io_service, boost::assign::list_of(a)(b))
      .all(boost::assign::list_of(c)(d))
      .then(e);
    run_service(io_service);
    std::cout << std::endl;
  }

  {
    std::cout << "a -> ((b -> d) | c) -> e\n"
                 "  ";
    boost::asio::io_service io_service;
    async(io_service, a)
      .any(boost::assign::list_of
           (async(io_service, b).then(d))
           (async(inherit_service, c)))
      .then(e);
    run_service(io_service);
    std::cout << std::endl;
  }
}

次の出力が生成されます。

a -> b -> c
  AaBbCc
(a & b) -> c
  ABabCc
(a | b) -> c
  ABaCbc
(a & b) -> (c & d)
  ABabCDcd
(a & b) -> c -> (d & e)
  ABabCcDEed
(a & b) -> (c & d) -> e
  ABabCDcdEe
(a | b) -> (c | d) -> e
  ABaCbDcEed
(a | b) -> (c & d) -> e
  ABaCbDcdEe
a -> ((b -> d) | c) -> e
  AaBCcEbDed
于 2013-08-21T17:45:56.260 に答える
-1

仕事のコンセプトを使ってみませんか?io_service::run は、作業が利用可能な限り実行され、未完了のタスクがなくなるとすぐに作業が削除されると終了します。

run を呼び出す前に、作業インスタンスを作成します。

boost::shared_ptr<boost::asio::io_service::work> work( new boost::asio::io_service::work( io_service) );

そして、他のスレッドでは、io_servce::run の終了を許可したいときにすぐに呼び出します。

work.reset();

http://www.boost.org/doc/libs/1_54_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.stopping_the_io_service_from_running_out_of_workも参照してください

于 2013-08-20T12:52:14.443 に答える