4

大規模なプロジェクトで ThreadSanitizer の警告をクリーンアップする作業を行っています。特にこの正確なケースでは、ファイル、プロデューサーから読み取る生成されたスレッドがあります。次に、スレッド プールの一部として 1 つ以上の圧縮解除スレッドがあります。最後に、解凍されたブロックを取得して実際に処理を行う 1 つのスレッドがあります。もちろん、これにより、複数のブロックを同時に解凍できます。

プロジェクトがアトミック bool と を介して同期する場所はたくさんありますがusleep()、特にこれを含みます。もちろん、これは理想的ではなく、私に割り当てられたものの 1 つです。

しかし、ミューテックスとロックが表現されている限り、ThreadSanitizer が不平を言う問題が何であるかはわかりません (condition_variable を使用する場合と比較して潜在的に効率が低下することを除いて)。

ThreadSanitizer は、「スリープ経由で同期されているかのように」データ競合について不平を言い、usleep()呼び出された場所の場所を提供します。私の問題は、もちろんスリープによる同期は理想的ではないということですが、ミューテックスが尊重されている限り、データ競合は見られません。ミューテックスの仕組みを理解している限り、それらは尊重されていると思います。

そのため、ThreadSanitizer が不平を言っていることを正確に特定するために、再現手順の最小限のセットを作成しようとしています。

これが私が思いついたコードです:

// g++ --std=c++11 -lpthread as-if-synchronized-via-sleep.cpp -g -fsanitize=thread -pie -fPIC
#include <iostream>
#include <iomanip>
#include <thread>
#include <vector>
#include <atomic>
#include <mutex>
#include <chrono>
#include <cstdlib>

#include <unistd.h>

class Data {
public:
        char uncompressed[1024];
        char compressed[1024];
        std::atomic<bool> done = {false};
};

int main(int argc, char **argv) {
        std::atomic<bool> done = {false};
        std::atomic<int> count = {0};

        std::mutex m;
        std::vector<Data*> v;

        std::thread provider{[&](){
                while ( not done ) {
                        while ( 100 < count ) {
                                usleep(20);
                                if ( done ) {
                                        break;
                                }
                        }
                        Data *data = new Data();
                        // Simulate reading in compressed data
                        for ( size_t i = 0; i < sizeof(data->uncompressed); ++i )
                                data->uncompressed[i] = char(rand());
                        std::unique_lock<std::mutex> l(m);
                        v.push_back(data);
                        l.unlock();
                        ++count;
                }
                return;
        }};

        std::thread decompressor{[&](){
                while ( not done ) {
                        while ( 0 == count ) {
                                usleep(20);
                                if ( done ) {
                                        break;
                                }
                        }
                        std::unique_lock<std::mutex> l(m);
                        if ( v.empty() ) {
                                continue;
                        }
                        Data* data = v.front();
                        // Simulate decompressing it.
                        for ( size_t i = 0; i < sizeof(data->compressed); ++i )
                                data->compressed[i] = data->uncompressed[i] ^ char(0xff);
                        data->done = true;
                }
        }};

        std::thread consumer{[&](){
                while ( not done ) {
                        std::unique_lock<std::mutex> l(m);
                        while ( not v.empty() ) {
                                if ( done ) {
                                        break;
                                }
                                if ( v.front()->done ) {
                                        break;
                                }
                                l.unlock();
                                usleep(20);
                                l.lock();
                        }
                        if ( done ) {
                                break;
                        }
                        // Simulate consuming it
                        Data* data = v.front();
                        v.erase(v.begin());
                        l.unlock();
                        // Pretend we're doing stuff with decompressed data
                        std::vector<char> vv(1,1);
                        for ( std::size_t i = 0; i < sizeof(data->uncompressed); ++i ) {
                                if ( data->uncompressed[i] )
                                        vv.back() += data->uncompressed[i];
                                else
                                        vv.emplace_back();
                        }
                        delete data;
                        size_t n = 0;
                        for ( auto c : vv ) {
                                std::cout << std::hex << std::setw(2) << (int)c;
                        }
                }
        }};
        std::cout << "Main sleeping" << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
        done = true;
        std::cout << "Main joining..." << std::endl;
        provider.join();
        decompressor.join();
        consumer.join();

        std::cout << "Cleaning up" << std::endl;
        std::unique_lock<std::mutex> l(m);
        while ( not v.empty() ) {
                delete v.back();
                v.erase(std::next(v.rbegin()).base());
        }

        std::cout << "Done!" << std::endl;
        return 0;
}

最初のコメントが示すようにそのコードをコンパイルすると、GCC 4.9.1 を使用して、実行の最後に ThreadSanitizer から次のメッセージが表示されます。

Main sleeping
4f72ffffffa8ffffffb9Main joining...
==================
WARNING: ThreadSanitizer: data race (pid=64255)
  Write of size 8 at 0x7d100000dfc8 by thread T3:
    #0 operator delete(void*) /home/keithb/dev/gcc/gcc_4_9_1_release/libsanitizer/tsan/tsan_interceptors.cc:592 (libtsan.so.0+0x000000049480)
    #1 deallocate /home/keithb/include/c++/4.9.1/ext/new_allocator.h:110 (a.out+0x000000004ec9)
    #2 deallocate /home/keithb/include/c++/4.9.1/bits/alloc_traits.h:383 (a.out+0x000000004b98)
    #3 _M_destroy /home/keithb/include/c++/4.9.1/bits/shared_ptr_base.h:535 (a.out+0x000000005fbc)
    #4 std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /home/keithb/dev/gcc/gcc_4_9_1_build/x86_64-unknown-linux-gnu/libstdc++-v3/include/bits/shared_ptr_base.h:166 (libstdc++.so.6+0x0000000b5b30)
    #5 ~__shared_count /home/keithb/dev/gcc/gcc_4_9_1_build/x86_64-unknown-linux-gnu/libstdc++-v3/include/bits/shared_ptr_base.h:666 (libstdc++.so.6+0x0000000b5b30)
    #6 ~__shared_ptr /home/keithb/dev/gcc/gcc_4_9_1_build/x86_64-unknown-linux-gnu/libstdc++-v3/include/bits/shared_ptr_base.h:914 (libstdc++.so.6+0x0000000b5b30)
    #7 ~shared_ptr /home/keithb/dev/gcc/gcc_4_9_1_build/x86_64-unknown-linux-gnu/libstdc++-v3/include/bits/shared_ptr.h:93 (libstdc++.so.6+0x0000000b5b30)
    #8 execute_native_thread_routine /home/keithb/dev/gcc/gcc_4_9_1_release/libstdc++-v3/src/c++11/thread.cc:95 (libstdc++.so.6+0x0000000b5b30)

  Previous atomic write of size 4 at 0x7d100000dfc8 by main thread:
    #0 __tsan_atomic32_fetch_add /home/keithb/dev/gcc/gcc_4_9_1_release/libsanitizer/tsan/tsan_interface_atomic.cc:468 (libtsan.so.0+0x0000000206ce)
    #1 __exchange_and_add /home/keithb/include/c++/4.9.1/ext/atomicity.h:49 (a.out+0x0000000021f4)
    #2 __exchange_and_add_dispatch /home/keithb/include/c++/4.9.1/ext/atomicity.h:82 (a.out+0x0000000022ab)
    #3 std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /home/keithb/include/c++/4.9.1/bits/shared_ptr_base.h:146 (a.out+0x000000007e99)
    #4 std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /home/keithb/include/c++/4.9.1/bits/shared_ptr_base.h:666 (a.out+0x000000007346)
    #5 std::__shared_ptr<std::thread::_Impl_base, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr() /home/keithb/include/c++/4.9.1/bits/shared_ptr_base.h:914 (a.out+0x000000007019)
    #6 std::shared_ptr<std::thread::_Impl_base>::~shared_ptr() /home/keithb/include/c++/4.9.1/bits/shared_ptr.h:93 (a.out+0x000000007045)
    #7 thread<main(int, char**)::<lambda()> > /home/keithb/include/c++/4.9.1/thread:135 (a.out+0x0000000031eb)
    #8 main /home/keithb/dev/mytest/thread-sanitizer-checks/main.cpp:102 (a.out+0x000000002bc9)

  Location is heap block of size 64 at 0x7d100000dfc0 allocated by main thread:
    #0 operator new(unsigned long) /home/keithb/dev/gcc/gcc_4_9_1_release/libsanitizer/tsan/tsan_interceptors.cc:560 (libtsan.so.0+0x0000000496c2)
    #1 allocate /home/keithb/include/c++/4.9.1/ext/new_allocator.h:104 (a.out+0x000000004e28)
    #2 allocate /home/keithb/include/c++/4.9.1/bits/alloc_traits.h:357 (a.out+0x000000004aed)
    #3 __shared_count<std::thread::_Impl<std::_Bind_simple<main(int, char**)::<lambda()>()> >, std::allocator<std::thread::_Impl<std::_Bind_simple<main(int, char**)::<lambda()>()> > >, std::_Bind_simple<main(int, char**)::<lambda()>()> > /home/keithb/include/c++/4.9.1/bits/shared_ptr_base.h:616 (a.out+0x000000004735)
    #4 __shared_ptr<std::allocator<std::thread::_Impl<std::_Bind_simple<main(int, char**)::<lambda()>()> > >, std::_Bind_simple<main(int, char**)::<lambda()>()> > /home/keithb/include/c++/4.9.1/bits/shared_ptr_base.h:1090 (a.out+0x00000000443f)
    #5 shared_ptr<std::allocator<std::thread::_Impl<std::_Bind_simple<main(int, char**)::<lambda()>()> > >, std::_Bind_simple<main(int, char**)::<lambda()>()> > /home/keithb/include/c++/4.9.1/bits/shared_ptr.h:316 (a.out+0x00000000425f)
    #6 allocate_shared<std::thread::_Impl<std::_Bind_simple<main(int, char**)::<lambda()>()> >, std::allocator<std::thread::_Impl<std::_Bind_simple<main(int, char**)::<lambda()>()> > >, std::_Bind_simple<main(int, char**)::<lambda()>()> > /home/keithb/include/c++/4.9.1/bits/shared_ptr.h:588 (a.out+0x000000004118)
    #7 make_shared<std::thread::_Impl<std::_Bind_simple<main(int, char**)::<lambda()>()> >, std::_Bind_simple<main(int, char**)::<lambda()>()> > /home/keithb/include/c++/4.9.1/bits/shared_ptr.h:604 (a.out+0x000000003e74)
    #8 _M_make_routine<std::_Bind_simple<main(int, char**)::<lambda()>()> > /home/keithb/include/c++/4.9.1/thread:193 (a.out+0x000000003925)
    #9 thread<main(int, char**)::<lambda()> > /home/keithb/include/c++/4.9.1/thread:135 (a.out+0x0000000031b9)
    #10 main /home/keithb/dev/mytest/thread-sanitizer-checks/main.cpp:102 (a.out+0x000000002bc9)

  Thread T3 (tid=64259, running) created by main thread at:
    #0 pthread_create /home/keithb/dev/gcc/gcc_4_9_1_release/libsanitizer/tsan/tsan_interceptors.cc:877 (libtsan.so.0+0x000000047bf3)
    #1 __gthread_create /home/keithb/dev/gcc/gcc_4_9_1_build/x86_64-unknown-linux-gnu/libstdc++-v3/include/x86_64-unknown-linux-gnu/bits/gthr-default.h:662 (libstdc++.so.6+0x0000000b5be0)
    #2 std::thread::_M_start_thread(std::shared_ptr<std::thread::_Impl_base>) /home/keithb/dev/gcc/gcc_4_9_1_release/libstdc++-v3/src/c++11/thread.cc:142 (libstdc++.so.6+0x0000000b5be0)
    #3 main /home/keithb/dev/mytest/thread-sanitizer-checks/main.cpp:102 (a.out+0x000000002bc9)

SUMMARY: ThreadSanitizer: data race /home/keithb/include/c++/4.9.1/ext/new_allocator.h:110 deallocate
==================
Cleaning up
Done!
ThreadSanitizer: reported 1 warnings

すべてのスレッドが結合された後、ラムダ、具体的には共有ポインターのデストラクタのように見えるもので、tsan は最終的に「スリープ経由で同期されているかのように」と不平を言います。これは C++11 の機能であり、Google は ThreadSanitizer が C++11 をあまり好まないという多くの証拠を提供しているため、それ自体は「本当の」競争ではないと思います。

とはいえ...ラムダがスレッドへのエントリポイントとして使用されているという事実...ラムダが破棄される前にスレッドが完全に終了していない実際のレースなのだろうか?それはコンパイラのバグの領域にあるため、これ以上調査する必要はありません (スコープ クリープなど)。

だから私の質問(私はそれらがいくらかオープンエンドであることを知っています、それらを絞り込むのを手伝ってください?)

この場合、ThreadSanitizer がプロデューサー、デコンプレッサ、およびコンシューマで「スリープ経由で同期されたかのように」データ競合を検出しないのはなぜですか? さらに、このコードを変更して...

A) ...「通常」クラッシュを引き起こさない ThreadSanitizer 警告を発行する真のデータ競合を生成します (問題のソフトウェアの現在の製品リリースは、この想定されるデータ競合の結果としてクラッシュするようには見えません)?

B) ...一見するとデータ競合ではないように見えますが、知識や経験からそうではないことが明らかになる真のデータ競合を生み出しますか?

C) ...ThreadSanitizer から偽陽性を生成しますが、実際にはデータ競合はありません。

4

1 に答える 1