0

私のプログラムでは、プロデューサー スレッドがテキスト ファイル (約 8000 行のテキスト) からテキスト行を読み取り、その行を同時実行キューに読み込みます。

3 つのコンシューマー スレッドがキューから行を読み取り、それぞれが別のファイルに書き込みます。

プログラムを実行すると、プロデューサー スレッドとコンシューマー スレッドの 1 つだけが完了します。他の 2 つのスレッドはハングしているようです。

すべてのコンシューマー スレッドにファイルの終わりに到達したことを確実に伝えるにはどうすればよいでしょうか。

私のプラットフォームは Windows 7 64 ビットです

VC11.

64 ビットと 32 ビットとしてコンパイルされたコードは同じ動作をしました。

これがコードです。(自己完結型でコンパイル可能です)

#include <queue>
#include<iostream>
#include<fstream>
#include <atomic>
#include <thread>
#include <condition_variable>
#include <mutex>
#include<string>
#include<memory>


template<typename Data>
class concurrent_queue
{
private:
    std::queue<Data> the_queue;
    mutable std::mutex the_mutex;
    std::condition_variable the_condition_variable;
public:
    void push(Data const& data){
        {
            std::lock_guard<std::mutex> lock(the_mutex);
            the_queue.push(data);
        }
        the_condition_variable.notify_one();
    }

    bool empty() const{
        std::unique_lock<std::mutex> lock(the_mutex);
        return the_queue.empty();
    }

    const size_t size() const{
        std::lock_guard<std::mutex> lock(the_mutex);
        return the_queue.size();
    }

    bool try_pop(Data& popped_value){
        std::unique_lock<std::mutex> lock(the_mutex);
        if(the_queue.empty()){
            return false;
        }
        popped_value=the_queue.front();
        the_queue.pop();
        return true;
    }

    void wait_and_pop(Data& popped_value){
        std::unique_lock<std::mutex> lock(the_mutex);
        while(the_queue.empty()){
            the_condition_variable.wait(lock);
        }
        popped_value=the_queue.front();
        the_queue.pop();
    }
};

std::atomic<bool> done(true);
typedef std::vector<std::string> segment;
concurrent_queue<segment> data;
const int one_block = 15;

void producer()
{
    done.store(false);
    std::ifstream inFile("c:/sample.txt");
    if(!inFile.is_open()){
        std::cout << "Can't read from file\n";
        return;
    }

    std::string line;
    segment seg;
    int cnt = 0;
    while(std::getline(inFile,line)){
        seg.push_back(line);
        ++cnt;
        if( cnt == one_block ){
            data.push( seg );
            seg.clear();
            cnt = 0;
        }
    }
    inFile.close();
    done.store(true);
    std::cout << "all done\n";
}

void consumer( std::string fname)
{
    std::ofstream outFile(fname.c_str());
    if(!outFile.is_open()){
        std::cout << "Can't write to file\n";
        return;
    }

    do{
        while(!data.empty()){
            segment seg;
            data.wait_and_pop( seg );
            for(size_t i = 0; i < seg.size(); ++i)
            {
                outFile << seg[i] << std::endl;
            }
            outFile.flush();
        }
    } while(!done.load());
    outFile.close();
    std::cout << fname << "  done.\n";
}

int main()
{
    std::thread th0(producer);
    std::thread th1(consumer, "Worker1.txt");
    std::thread th2(consumer, "Worker2.txt");
    std::thread th3(consumer, "Worker3.txt");

    th0.join();
    th1.join();
    th2.join();
    th3.join();

    return 0;
}
4

3 に答える 3

1

キューで待機しているすべてのスレッドを終了するために私が使用しているアプローチは、pop()関数内に要素があることを確認する前にテストされるかどうかを示すフラグをキューに設定することです。フラグがプログラムを停止する必要があることを示している場合pop()、キューに要素がない場合、スレッド呼び出しは例外をスローします。フラグが変更されると、変更スレッドnotify_all()は対応する条件変数を呼び出すだけです。

于 2012-12-29T20:54:25.763 に答える
0

次のコードを見てください。

while(!data.empty()){
    segment seg;
    data.wait_and_pop( seg );
    ...

データの最後のセグメントを読み取る状況を考えてみましょう。そして消費者は、データが読み取られるのを待っていますth1th2

コンシューマーは、読み取るデータがあるth1かどうかを確認し!data.empty()て見つけます。次に、 をth1呼び出す前にdata.wait_and_pop()、消費者はそれが真であることをth2確認し!data.empty()て見つけます。消費者th1が最後のセグメントを消費するとします。現在、読み取るセグメントがないため、 でth2無期限に待機the_queue.empty()data.wait_and_pop()ます。

上記のスニペットの代わりに、次のコードを試してください。

segment seg;
while(data.try_pop(seg)){
    ...

動作するはずです。

于 2012-12-29T20:53:12.523 に答える
0

おそらくブール値のフラグを に追加したいと思うでしょうconcurrent_queue。ファイルが読み取られたら、(ミューテックスの下で) 設定します。ファイルが読み取ら、キューが空になったら、 を使用して、キューを空にしたコンシューマーから条件変数をブロードキャストしますnotify_all

これにより、最終的な状態 (フラグが設定され、キューが空である) を見つけてループを終了する必要がある他のすべてのコンシューマーが目覚めます。競合状態を回避するには、最初に待機する前に、同じ複合状態をチェックする必要があることを意味します。

既存のフラグの問題は、スレッドが condvar の待機から復帰せず、チェックしないことです。「終了」フラグは、待機中の状態の一部である必要があります。

[編集: フラグに対する Dietmar の微妙な意味の違いにより、おそらくコードはより単純になりますが、比較するために両方を書いていません。]

于 2012-12-29T20:54:48.093 に答える