私は、pthreads、semaphore.h、および gcc アトミック ビルトインの上にある循環バッファーに支えられた高性能ブロッキング キューを実装しようとしています。キューは、異なるスレッドからの複数の同時読み取りおよび書き込みを処理する必要があります。
ある種の競合状態を分離しましたが、それがアトミック操作とセマフォの動作に関する誤った仮定なのか、それとも私の設計に根本的な欠陥があるのか はわかりません。
それを抽出して、以下のスタンドアロンの例に単純化しました。このプログラムが二度と戻ってこないことを期待しています。ただし、キューで破損が検出された数十万回の反復後に戻ります。
以下の例 (解説用) では、実際には何も格納されず、実際のデータを保持するセルを 1 に設定し、空のセルを表すために 0 を設定します。空いているセルの数を表すカウンティング セマフォ (Vacancy) と、占有されているセルの数を表すカウンティング セマフォ (Occupants) があります。
ライターは次のことを行います。
- 空室を減らす
- アトミックに次のヘッド インデックスを取得します (mod キュー サイズ)
- それに書き込みます
- 占有者を増やす
読者は反対のことをします:
- 占有者を減らす
- 次のテール インデックスをアトミックに取得する (mod キュー サイズ)
- そこから読む
- 空室を増やす
上記を考えると、正確に1つのスレッドが特定のセルを一度に読み書きできると思います。
機能しない理由やデバッグ戦略についてのアイデアを歓迎します。以下のコードと出力...
#include <stdlib.h>
#include <semaphore.h>
#include <iostream>
using namespace std;
#define QUEUE_CAPACITY 8 // must be power of 2
#define NUM_THREADS 2
struct CountingSemaphore
{
sem_t m;
CountingSemaphore(unsigned int initial) { sem_init(&m, 0, initial); }
void post() { sem_post(&m); }
void wait() { sem_wait(&m); }
~CountingSemaphore() { sem_destroy(&m); }
};
struct BlockingQueue
{
unsigned int head; // (head % capacity) is next head position
unsigned int tail; // (tail % capacity) is next tail position
CountingSemaphore vacancies; // how many cells are vacant
CountingSemaphore occupants; // how many cells are occupied
int cell[QUEUE_CAPACITY];
// (cell[x] == 1) means occupied
// (cell[x] == 0) means vacant
BlockingQueue() :
head(0),
tail(0),
vacancies(QUEUE_CAPACITY),
occupants(0)
{
for (size_t i = 0; i < QUEUE_CAPACITY; i++)
cell[i] = 0;
}
// put an item in the queue
void put()
{
vacancies.wait();
// atomic post increment
set(__sync_fetch_and_add(&head, 1) % QUEUE_CAPACITY);
occupants.post();
}
// take an item from the queue
void take()
{
occupants.wait();
// atomic post increment
get(__sync_fetch_and_add(&tail, 1) % QUEUE_CAPACITY);
vacancies.post();
}
// set cell i
void set(unsigned int i)
{
// atomic compare and assign
if (!__sync_bool_compare_and_swap(&cell[i], 0, 1))
{
corrupt("set", i);
exit(-1);
}
}
// get cell i
void get(unsigned int i)
{
// atomic compare and assign
if (!__sync_bool_compare_and_swap(&cell[i], 1, 0))
{
corrupt("get", i);
exit(-1);
}
}
// corruption detected
void corrupt(const char* action, unsigned int i)
{
static CountingSemaphore sem(1);
sem.wait();
cerr << "corruption detected" << endl;
cerr << "action = " << action << endl;
cerr << "i = " << i << endl;
cerr << "head = " << head << endl;
cerr << "tail = " << tail << endl;
for (unsigned int j = 0; j < QUEUE_CAPACITY; j++)
cerr << "cell[" << j << "] = " << cell[j] << endl;
}
};
BlockingQueue q;
// keep posting to the queue forever
void* Source(void*)
{
while (true)
q.put();
return 0;
}
// keep taking from the queue forever
void* Sink(void*)
{
while (true)
q.take();
return 0;
}
int main()
{
pthread_t id;
// start some pthreads to run Source function
for (int i = 0; i < NUM_THREADS; i++)
if (pthread_create(&id, NULL, &Source, 0))
abort();
// start some pthreads to run Sink function
for (int i = 0; i < NUM_THREADS; i++)
if (pthread_create(&id, NULL, &Sink, 0))
abort();
while (true);
}
上記を次のようにコンパイルします。
$ g++ -pthread AboveCode.cpp
$ ./a.out
出力は毎回異なりますが、一例を次に示します。
corruption detected
action = get
i = 6
head = 122685
tail = 122685
cell[0] = 0
cell[1] = 0
cell[2] = 1
cell[3] = 0
cell[4] = 1
cell[5] = 0
cell[6] = 1
cell[7] = 1
私のシステムは Intel Core 2 の Ubuntu 11.10 です:
$ uname -a
Linux 3.0.0-14-generic #23-Ubuntu SMP \
Mon Nov 21 20:28:43 UTC 2011 x86_64 x86_64 x86_64 GNU/Linux
$ cat /proc/cpuinfo | grep Intel
model name : Intel(R) Core(TM)2 Quad CPU Q9300 @ 2.50GHz
$ g++ --version
g++ (Ubuntu/Linaro 4.6.1-9ubuntu3) 4.6.1
ありがとう、アンドリュー。