4

そのため、先物のコンテナを作成するだけでなく、すべての future.get() をブロックしない方法で実行するための一般的な方法を作成しようとしています。

タスクの完了にかかる時間は、通常、数百ミリ秒から最大 2 分であると予想されます。ただし、一部はまったく完了しない場合があります。通常の実行では、少なくとも 10,000 のタスクを実行する必要があります。

futures コンテナー内の他の実行時間の長いタスクに妨げられることなく、最も速く返されるタスクの結果が返されることを望みます。

これまでのところ、タスク完了の遅延をシミュレートするためにダミーのスリープ時間を使用しているだけです (設計は、主に、 thisthisなど、ここで作成された優れた投稿に感謝します):

#include <future>
#include <vector>
#include <iostream>
#include <random>
#include <chrono>
#include <ratio>
#include <thread>
#include <algorithm>

size_t rand_from_range(const size_t, const size_t);
int rand_sleep_range(const size_t, const size_t);
template<class CT> size_t get_async_all( CT& );

// Given a function and a collection,
//  return a vector of futures.
template<class Function, class CT>
auto async_all( Function f, CT coll )
    -> std::vector<decltype(std::async(f, *std::begin(coll)))>
{
  std::vector<decltype(std::async(f, *std::begin(coll)))> futures;
  futures.reserve(coll.size());
  for (auto& element : coll)
    futures.push_back(std::async(f, element));
  return futures;
}

// Given the beginning and end of a number
//  range, return a random number therein.
size_t rand_from_range( const size_t range_begin, 
                        const size_t range_end )
{
  std::uniform_int_distribution<size_t> 
    distr(range_begin, range_end);
  std::random_device dev;
  return distr(dev);
} 

// Given a shortest and longest duration, put the calling
//  thread to sleep for a random duration therein. 
// (in milliseconds)
int rand_sleep_range( const size_t shortest_time, 
                      const size_t longest_time )
{
  std::chrono::milliseconds 
    sleep_time(rand_from_range(shortest_time, longest_time));
  std::this_thread::sleep_for(sleep_time);
  return (int)sleep_time.count();
} 

// Given a container of futures, perform all
//  get()'s.
template<class CT>
size_t get_async_all( CT& async_coll )
{
  size_t get_ctr(0);
  const size_t future_cnt = async_coll.size();
  std::vector<size_t> completed;
  completed.reserve(future_cnt);

  while (true) {
    for (size_t ndx = 0; ndx < future_cnt; ++ndx) {
      // Check to see if this ndx' future has completed already.
      if (std::none_of(std::begin(completed), std::end(completed), 
            [=](size_t x) {
              return (x == ndx);
            }))
      { // No, this one hasn't completed 
        //  yet, attempt to process it.
        auto& f = async_coll[ndx];
        if (f.wait_for(std::chrono::milliseconds(10)) 
              == std::future_status::ready) 
        {
          f.get(); // The future's work gets done here.
          ++get_ctr;
          completed.push_back(ndx);
          if (completed.size() == future_cnt) 
            break; // for()
        }
      }
    }
    if (completed.size() == future_cnt) 
      break; // while()
  }
  return get_ctr;
}

int main()
{
  // A dummy container of ints.
  std::vector<int> my_vec(100);
  for (auto& elem : my_vec)
    elem = rand_from_range(1, 100);

  // A dummy function lambda.
  auto my_func = [](int x) { 
    int x_ = x;
    int sleep_time = rand_sleep_range(100, 20000); // in ms.
    x *= 2;
    std::cout << " after sleeping " << sleep_time << "ms \t"
              << "f(" << x_ << ") = " << x << std::endl;
  };

  // Create and execute the container of futures.
  auto async_coll = async_all(my_func, my_vec);
  size_t count = get_async_all(async_coll);

  std::cout << std::endl << count << " items completed. \n";
}

だから、私の質問は次のとおりです。

  • 私が使用しているアプローチに落とし穴はありますか?
  • 私が使用しているものよりも get_async_all() のためのより良い/よりエレガントなアプローチはありますか? または、私がしていることは何でも。

時間を割いてコードを確認し、建設的な批判やフィードバックをくれた人に感謝します。

4

1 に答える 1

3

少なくとも 1 つの落とし穴があります。起動ポリシーを指定せずに呼び出すstd::asyncと、一部またはすべてのタスクが遅延して実行される可能性があります。しかし、タスクが完了したかどうかを確認するテストでは、std::future_status_ready. タスクが延期された場合、常に がstd::future_status_deferred返されます。つまり、テストで true が返されることはありません。

この問題に対する最も簡単な解決策は、起動ポリシーを に指定するstd::launch::asyncことですが、そうすると、システムをオーバーサブスクライブするリスクがあります。別の方法として、遅延タスクをチェックするようにテストを変更することもできますが、問題はそれらをどうするかです。getまたはそれらを呼び出すとwait、任意の時間ブロックされます。

一般的なアプローチに関しては、ポーリング時に各タスクが完了するのを 10 ミリ秒待機するのではなく、0 ミリ秒待機することを検討してください。つまり、タスクが終了したかどうかを確認するために純粋なポーリングを実行します。これにより、タスクが終了してから処理するまでの待ち時間が短縮される可能性がありますが、ポーリング オーバーヘッドが増加し、システム全体の動作が遅くなる可能性があります。

まったく異なるアプローチは、各タスクのポーリングを放棄し、代わりに各タスクに「完了」フラグを共有データ構造 (たとえば、 a std::deque) に書き込み、そのデータ構造を定期的にポーリングして、その中に何かがあるかどうかを確認することです。 . その場合は、完了したタスクを処理し、データ構造から削除してから、再度ポーリングするまでスリープ状態に戻ります。タスクがpush_backデータ構造に対して実行する場合は、完了した順序で自然に処理できます。この設計の欠点は、共有データ構造がパフォーマンスのボトルネックになる可能性があることです。

于 2013-02-12T15:19:30.320 に答える