2

すでにジョブの抽象化レイヤーを作成するためのWorkerクラスとクラスがあります。Handlerミックスに非同期性を注ぎ込むために使用したかったstd::asyncのですが、Visual Studio 2012(アップデート1)から奇妙な動作が発生しました。

私のクラス階層は次のとおりです。

  • Workerは、純粋仮想メソッドを使用する抽象クラスInitですWork
  • BasicWorker : Worker単にprintfいくつかの出力に使用しています。
  • GroupWorker : Worker他の労働者の集合体です。
  • HandlerWorkerいくつかの仕事をするために保持します。

次にstd::async、ワーカーとハンドラーを作成するいくつかのメソッドを呼び出し、ネストされた呼び出しでハンドラーを呼び出し、ワーカーのstd::async初期化(ここ)を待ってstd::condition_variableからハンドラーを停止します。

最後に、すべてのが終了するのを待ちstd::futureます。

コードは次のとおりです。

#include <stdio.h>
#include <future>
#include <array>
#include <atomic>
#include <vector>

struct Worker
{
    virtual ~Worker() { }
    virtual void Init() = 0;
    virtual void Work() = 0;
};

struct BasicWorker : public Worker
{
    virtual ~BasicWorker() { }
    virtual void Init()
    {
        printf("\t\t\t\tInit: %d\n", std::this_thread::get_id());
    }

    virtual void Work()
    {
        printf("\t\t\t\tWork: %d\n", std::this_thread::get_id());
    }
};

struct GroupWorker : public Worker
{
    GroupWorker()
    {
        workers.push_back(std::make_shared<BasicWorker>());
    }

    virtual ~GroupWorker() { }

    virtual void Init()
    {
        for(int i = 0; i < workers.size(); ++i)
        {
            workers[i]->Init();
        }
        initEvent.notify_all();
    }

    virtual void Work()
    {
        for(int i = 0; i < workers.size(); ++i)
        {
            workers[i]->Work();
        }
    }

    void WaitForInit()
    {
        //std::unique_lock<std::mutex> initLock(initMutex);
        //initEvent.wait(initLock);
    }
private:
    std::mutex initMutex;
    std::condition_variable initEvent;
    std::vector<std::shared_ptr<Worker>> workers;
};

struct Handler
{
    static const int Stopped = -1;
    static const int Ready = 0;
    static const int Running = 1;

    Handler(const std::shared_ptr<Worker>& worker) :
        worker(worker)
    { }

    void Start(int count)
    {
        int readyValue = Ready;
        if(working.compare_exchange_strong(readyValue, Running))
        {
            worker->Init();

            for(int i = 0; i < count && working == Running; ++i)
            {
                worker->Work();
            }
        }
    }

    void Stop()
    {
        working = Stopped;
    }
private:
    std::atomic<int> working;
    std::shared_ptr<Worker> worker;
};

std::future<void> Start(int jobIndex, int runCount)
{
    //printf("Start: %d\n", jobIndex);
    return std::async(std::launch::async, [=]()
    {
        printf("Async: %d\n", jobIndex);
        auto worker = std::make_shared<GroupWorker>();
        auto handler = std::make_shared<Handler>(worker);

        auto result = std::async(std::launch:async, [=]()
        {
            printf("Nested async: %d\n", jobIndex);
            handler->Start(runCount);
        });

        worker->WaitForInit();
        handler->Stop();

        result.get();
    });
}

int main()
{
    const int JobCount = 300;
    const int RunCount =  5;
    std::array<std::future<void>, JobCount> jobs;

    for(int i = 0; i < JobCount; ++i)
    {
        jobs[i] = Start(i, RunCount);
    }

    for(int i = 0; i < JobCount; ++i)
    {
        jobs[i].get();
    }
}

私の問題は:

  • 関数内の行のコメントを解除するとWaitForInit@GroupWorker、ネストされた非同期関数呼び出しは、すべての第1レベルの非同期関数呼び出しが行われるまで行われません。
  • ジョブの数を増やすのを待っstd::condition_variableている間、新しいスレッドの作成は指数関数的に遅くなるように感じます。100未満のジョブのトライアルでは、非同期が存在しますが、300を超えると、ジョブを作成するのは完全にシーケンシャルです。
  • 次にprintf、メソッドの行のコメントを解除するStartと、ネストされたすべての非同期が魅力のように機能します

それで、

  • 使用法で何が間違っていますstd::condition_variableか?
  • 何百ものスレッドのように、ジョブの作成が遅くなるのはなぜですか?(この質問はオプションであり、OSの問題のようであり、スマートスレッドプールの概念で修正できます)
  • これとは何printfの関係がありますか?(競合状態の場合にすべての呼び出しを削除しようとprintfしましたが、コードにブレークポイントを設定しましたが、ヘルプはありません。これも同じですstd::cout

編集:スレッドの作成を保証するための起動ポリシー(Jonathan Wakelyによって提案された)を追加しました。しかし、それも役に立ちませんでした。私は現在、第1レベルの非同期内で待機するための関数std::threadと呼び出し関数を作成しています。thread::join

4

1 に答える 1

1

NBを呼び出すことは問題ありませんが、に変換可能でprintfあると想定しないでください。次のように、少しポータブルにすることができます。std::thread::idint

inline long tol(std::thread::id id)
{
  std::ostringstream ss;
  ss << id;
  return stol(ss.str());
}

(これは、の文字​​列値std::thread::idがに変換できることを前提としていますlong。これは必須ではありませんが、暗黙の変換を想定するよりも可能性が高いですint

std :: condition_variableの使用法で何が間違っていますか?

待機している「条件」はなく、へのnotify_all呼び出しの前に呼び出しが確実に行われるようにするための同期もありませんwait。によって設定される「このワーカーは初期化されました」というメンバー変数が必要ですInit。条件変数が真でない場合にのみ待機します(データの競合を防ぐために、フラグはアトミックであるか、ミューテックスによって保護されている必要があります)。 。

何百ものスレッドのように、ジョブの作成が遅くなるのはなぜですか?(この質問はオプションであり、OSの問題のようであり、スマートスレッドプールの概念で修正できます)

何百ものスレッドがあるため、共有リソースに対する多くの競合があり、OSスケジューラーに多くのプレッシャーがかかるため、実装はおそらく、非同期関数ではなく、遅延関数(つまり、std::asyncで呼び出されたかのように)を返し始めることを決定します。非同期ワーカーとそのネストされた非同期ワーカーの両方が遅延関数として実行される場合、外部関数はネストされた関数の呼び出しを待機するのをブロックしますstd::launch::deferredが、ネストされた関数は1つは呼び出します。あなたのプログラムは移植性がなく、(私が正しく理解していれば)MSVCのためにWindowsでのみ動作しますasyncInitresult.get()asyncスレッドが使用可能になった場合に遅延関数を実行するワークスティーリングスレッドプールを使用します。これは規格では必須ではありません。 各ワーカーに新しいスレッドを強制する場合は、std::launch::asyncポリシーを使用します。

printfはこれと何の関係がありますか?(競合状態の場合にすべてのprintf呼び出しを削除しようとしましたが、コードにブレークポイントを設定しましたが、ヘルプはありません。std:: coutの場合も同じです)

スレッドは現在、単一のグローバルリソースをめぐって競合しており、おそらく競合しているため、わずかな遅延が発生し、スレッド間に信頼性の低い順序付けが発生する可能性があります。によって課せられる遅延はprintf、1つのスレッドが終了するのに十分である可能性があります。これにより、そのリソースがスレッドプールに解放され、別の非同期ワーカーを実行できるようになります。

于 2012-12-11T00:45:17.147 に答える