4

私は、pthreads、semaphore.h、および gcc アトミック ビルトインの上にある循環バッファーに支えられた高性能ブロッキング キューを実装しようとしています。キューは、異なるスレッドからの複数の同時読み取りおよび書き込みを処理する必要があります。

ある種の競合状態を分離しましたが、それがアトミック操作とセマフォの動作に関する誤った仮定なのか、それとも私の設計に根本的な欠陥があるのか​​ はわかりません。

それを抽出して、以下のスタンドアロンの例に単純化しました。このプログラムが二度と戻ってこないことを期待しています。ただし、キューで破損が検出された数十万回の反復後に戻ります。

以下の例 (解説用) では、実際には何も格納されず、実際のデータを保持するセルを 1 に設定し、空のセルを表すために 0 を設定します。空いているセルの数を表すカウンティング セマフォ (Vacancy) と、占有されているセルの数を表すカウンティング セマフォ (Occupants) があります。

ライターは次のことを行います。

  1. 空室を減らす
  2. アトミックに次のヘッド インデックスを取得します (mod キュー サイズ)
  3. それに書き込みます
  4. 占有者を増やす

読者は反対のことをします:

  1. 占有者を減らす
  2. 次のテール インデックスをアトミックに取得する (mod キュー サイズ)
  3. そこから読む
  4. 空室を増やす

上記を考えると、正確に1つのスレッドが特定のセルを一度に読み書きできると思います。

機能しない理由やデバッグ戦略についてのアイデアを歓迎します。以下のコードと出力...

#include <stdlib.h>
#include <semaphore.h>
#include <iostream>

using namespace std;

#define QUEUE_CAPACITY 8 // must be power of 2
#define NUM_THREADS 2

struct CountingSemaphore
{
    sem_t m;
    CountingSemaphore(unsigned int initial) { sem_init(&m, 0, initial); }
    void post() { sem_post(&m); }
    void wait() { sem_wait(&m); }
    ~CountingSemaphore() { sem_destroy(&m); }
};

struct BlockingQueue
{
    unsigned int head; // (head % capacity) is next head position
    unsigned int tail; // (tail % capacity) is next tail position
    CountingSemaphore vacancies; // how many cells are vacant
    CountingSemaphore occupants; // how many cells are occupied

    int cell[QUEUE_CAPACITY];
// (cell[x] == 1) means occupied
// (cell[x] == 0) means vacant

    BlockingQueue() :
        head(0),
        tail(0),
        vacancies(QUEUE_CAPACITY),
        occupants(0)
    {
        for (size_t i = 0; i < QUEUE_CAPACITY; i++)
            cell[i] = 0;
    }

    // put an item in the queue
    void put()
    {
        vacancies.wait();

        // atomic post increment
        set(__sync_fetch_and_add(&head, 1) % QUEUE_CAPACITY);

        occupants.post();
    }

    // take an item from the queue
    void take()
    {
        occupants.wait();

        // atomic post increment
        get(__sync_fetch_and_add(&tail, 1) % QUEUE_CAPACITY);

        vacancies.post();
    }

    // set cell i
    void set(unsigned int i)
    {
        // atomic compare and assign
        if (!__sync_bool_compare_and_swap(&cell[i], 0, 1))
        {
            corrupt("set", i);
            exit(-1);
        }
    }

    // get cell i
    void get(unsigned int i)
    {
        // atomic compare and assign
        if (!__sync_bool_compare_and_swap(&cell[i], 1, 0))
        {
            corrupt("get", i);
            exit(-1);
        }
    }

    // corruption detected
    void corrupt(const char* action, unsigned int i)
    {
        static CountingSemaphore sem(1);
        sem.wait();

        cerr << "corruption detected" << endl;
        cerr << "action = " << action << endl;
        cerr << "i = " << i << endl;
        cerr << "head = " << head << endl;
        cerr << "tail = " << tail << endl;

        for (unsigned int j = 0; j < QUEUE_CAPACITY; j++)
            cerr << "cell[" << j << "] = " << cell[j] << endl;
    }
};

BlockingQueue q;

// keep posting to the queue forever
void* Source(void*)
{
    while (true)
        q.put();

    return 0;
}

// keep taking from the queue forever
void* Sink(void*)
{
    while (true)
        q.take();

    return 0;
} 

int main()
{
    pthread_t id;

    // start some pthreads to run Source function
    for (int i = 0; i < NUM_THREADS; i++)
        if (pthread_create(&id, NULL, &Source, 0))
            abort();

    // start some pthreads to run Sink function
    for (int i = 0; i < NUM_THREADS; i++)
        if (pthread_create(&id, NULL, &Sink, 0))
            abort();

    while (true);
}

上記を次のようにコンパイルします。

    $ g++ -pthread AboveCode.cpp
    $ ./a.out

出力は毎回異なりますが、一例を次に示します。

    corruption detected
    action = get
    i = 6
    head = 122685
    tail = 122685
    cell[0] = 0
    cell[1] = 0
    cell[2] = 1
    cell[3] = 0
    cell[4] = 1
    cell[5] = 0
    cell[6] = 1
    cell[7] = 1

私のシステムは Intel Core 2 の Ubuntu 11.10 です:

    $ uname -a
    Linux 3.0.0-14-generic #23-Ubuntu SMP \
      Mon Nov 21 20:28:43 UTC 2011 x86_64 x86_64 x86_64 GNU/Linux
    $ cat /proc/cpuinfo | grep Intel
    model name : Intel(R) Core(TM)2 Quad  CPU   Q9300  @ 2.50GHz
    $ g++ --version
    g++ (Ubuntu/Linaro 4.6.1-9ubuntu3) 4.6.1

ありがとう、アンドリュー。

4

3 に答える 3

4

考えられる状況の 1 つ。2 つの書き込みスレッド (W0、W1) と 1 つの読み取りスレッド (R0) について段階的にトレースします。W0 は W1 よりも早く put() に入り、OS またはハードウェアによって中断され、後で終了しました。

        w0 (core 0)               w1 (core 1)                r0
t0         ----                      ---       blocked on occupants.wait() / take
t1      entered put()                ---                    ---         
t2      vacancies.wait()           entered put()            ---
t3      got new_head = 1           vacancies.wait()         ---
t4     <interrupted by OS>         got new_head = 2         ---
t5                                 written 1 at cell[2]     ---
t6                                 occupants.post();        ---
t7                                 exited put()            waked up
t8                                   ---               got new_tail = 1
t9     <still in interrupt>          ---    read 0 from ceil[1]  !! corruption !!
t10     written 1 at cell[1]                           
t11     occupants.post();
t12     exited put()
于 2012-01-05T12:14:10.353 に答える
1

設計の観点から、キュー全体を共有リソースと見なし、単一のミューテックスで保護します。

ライターは次のことを行います。

  1. ミューテックスを取る
  2. キューへの書き込み (インデックスの処理を含む)
  3. ミューテックスを解放する

リーダーは次のことを行います。

  1. ミューテックスを取る
  2. キューからの読み取り (インデックスの処理を含む)
  3. ミューテックスを解放する
于 2012-01-05T09:15:37.827 に答える
0

私には理論があります。これは循環キューであるため、1 つの読み取りスレッドがラップされている可能性があります。リーダーがインデックス 0 を取るとします。何かを実行する前に、CPU を失います。別のリーダー スレッドがインデックス 1、次に 2、次に 3 ... 次に 7、そして 0 を取得します。最初のリーダーが起動し、両方のスレッドがインデックス 0 への排他的アクセス権を持っていると認識します。それを証明する方法がわかりません。それが役立つことを願っています。

于 2012-01-05T08:21:53.430 に答える