1

整数を保持するための同期キューを作成しましたが、理解できない奇妙な競合状態に直面しています。

解決策を投稿しないでください。コードを修正して機能させる方法を知っています。競合状態とは何か、意図したとおりに機能しない理由を知りたいです。何がうまくいかないのか、その理由を理解するのを手伝ってください。

まず、コードの重要な部分:

これは、アプリケーションがバッファーが保持できる以上のデータを入れないことを前提としているため、現在のバッファー サイズはチェックされません。

static inline void int_queue_put_sync(struct int_queue_s * const __restrict int_queue, const long int value ) {
    if (value) { // 0 values are not allowed to be put in
        size_t write_offset; // holds a current copy of the array index where to put the element
        for (;;) {
            // retrieve up to date write_offset copy and apply power-of-two modulus
            write_offset = int_queue->write_offset & int_queue->modulus; 
            // if that cell currently holds 0 (thus is empty)
            if (!int_queue->int_container[write_offset])
                // Appetmt to compare and swap the new value in
                if (__sync_bool_compare_and_swap(&(int_queue->int_container[write_offset]), (long int)0, value))
                    // if successful then this thread was the first do do this, terminate the loop, else try again
                    break;
        }

        // increment write offset signaling other threads where the next free cell is
        int_queue->write_offset++;
        // doing a synchronised increment here does not fix the race condition
    }
}

これには、write_offset. RedHat 2.6.32 Intel(R) Xeon(R) 上の OS X gcc 4.2、Intel Core i5 クアッドコア、および Linux Intel C Compiler 12 でテスト済み。どちらも競合状態を引き起こします。

テストケース付きの完全なソース:

#include <pthread.h>

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <stdint.h>

// #include "int_queue.h"
#include <stddef.h>
#include <string.h>
#include <unistd.h>
#include <sys/mman.h>

#ifndef INT_QUEUE_H
#define INT_QUEUE_H

#ifndef MAP_ANONYMOUS
#define MAP_ANONYMOUS MAP_ANON
#endif

struct int_queue_s {
    size_t size;
    size_t modulus;
    volatile size_t read_offset;
    volatile size_t write_offset;
    volatile long int int_container[0];
};

static inline void int_queue_put(struct int_queue_s * const __restrict int_queue, const long int value ) {
    if (value) {
        int_queue->int_container[int_queue->write_offset & int_queue->modulus] = value;
        int_queue->write_offset++;
    }
}

static inline void int_queue_put_sync(struct int_queue_s * const __restrict int_queue, const long int value ) {
    if (value) {
        size_t write_offset;
        for (;;) {
            write_offset = int_queue->write_offset & int_queue->modulus;
            if (!int_queue->int_container[write_offset])
                if (__sync_bool_compare_and_swap(&(int_queue->int_container[write_offset]), (long int)0, value))
                    break;
        }

        int_queue->write_offset++;
    }
}

static inline long int int_queue_get(struct int_queue_s * const __restrict int_queue) {
    size_t read_offset = int_queue->read_offset & int_queue->modulus;
    if (int_queue->write_offset != int_queue->read_offset) {
        const long int value = int_queue->int_container[read_offset];
        int_queue->int_container[read_offset] = 0;
        int_queue->read_offset++;
        return value;
    } else
        return 0;
}

static inline long int int_queue_get_sync(struct int_queue_s * const __restrict int_queue) {
    size_t read_offset;
    long int volatile value;
    for (;;) {

        read_offset = int_queue->read_offset;
        if (int_queue->write_offset == read_offset)
            return 0;
        read_offset &= int_queue->modulus;
        value = int_queue->int_container[read_offset];
        if (value)
            if (__sync_bool_compare_and_swap(&(int_queue->int_container[read_offset]), (long int)value, (long int)0))
                break;
    }
    int_queue->read_offset++;
    return value;
}

static inline struct int_queue_s * int_queue_create(size_t num_values) {

    struct int_queue_s * int_queue;
    size_t modulus;
    size_t temp = num_values + 1;
    do {
        modulus = temp;
        temp--;
        temp &= modulus;
    } while (temp);
    modulus <<= 1;

    size_t int_queue_mem = sizeof(*int_queue) + ( sizeof(int_queue->int_container[0]) * modulus);

    if (int_queue_mem % sysconf(_SC_PAGE_SIZE)) int_queue_mem += sysconf(_SC_PAGE_SIZE) - (int_queue_mem % sysconf(_SC_PAGE_SIZE));

    int_queue = mmap(NULL, int_queue_mem, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE , -1, 0);
    if (int_queue == MAP_FAILED)
        return NULL;

    int_queue->modulus = modulus-1;
    int_queue->read_offset = 0;
    int_queue->write_offset = 0;
    int_queue->size = num_values;

    memset((void*)int_queue->int_container, 0, sizeof(int_queue->int_container[0]) * modulus);

    size_t i;
    for (i = 0; i < num_values; ) {
        int_queue_put(int_queue, ++i );
    }

    return int_queue;
}


#endif


void * test_int_queue_thread(struct int_queue_s * int_queue) {
    long int value;

    size_t i;

    for (i = 0; i < 10000000; i++) {


        int waited = -1;
        do {
            value = int_queue_get_sync(int_queue);
            waited++;
        } while (!value);

        if (waited > 0) {
            printf("waited %d cycles to get a new value\n", waited);
            // continue;
        }

        // else {
        printf("thread %p got value %ld, i = %zu\n", (void *)pthread_self(), value, i);
        // }

        int timesleep = rand();
        timesleep &= 0xFFF;

        usleep(timesleep);

        int_queue_put_sync(int_queue, value);

        printf("thread %p put value %ld back, i = %zu\n", (void *)pthread_self(), value, i);
    }

    return NULL;
}


int main(int argc, char ** argv) {
    struct int_queue_s * int_queue = int_queue_create(2);

    if (!int_queue) {
        fprintf(stderr, "error initializing int_queue\n");
        return -1;
    }

    srand(0);

    long int value[100];

    size_t i;

    for (i = 0; i < 100; i++) {
        value[0] = int_queue_get(int_queue);

        if (!value[0]) {
            printf("error getting value\n");
        }
        else {
            printf("got value %ld\n", value[0]);
        }

        int_queue_put(int_queue, value[0]);

        printf("put value %ld back successfully\n", value[0]);
    }

    pthread_t threads[100];
    for (i = 0; i < 4; i++) {
        pthread_create(threads + i, NULL, (void * (*)(void *))test_int_queue_thread, int_queue);
    } 

    for (i = 0; i < 4; i++) {
        pthread_join(threads[i], NULL);
    } 


    return 0;
}
4

3 に答える 3

5

興味深い質問です。これは勝手な推測です。:-)

read_offset と write_offset の間に同期が必要なようです。

たとえば、これは関連するかどうかにかかわらず人種です。コンペア アンド スワップと write_offset のインクリメントの間に、リーダーが入ってきて値をゼロに戻すことがあります。

Writer-1: get write_offset=0
Writer-2: get write_offset=0
Writer-1: compare-and-swap at offset=0
Writer-1: Set write_offset=1
Reader-1: compare-and-swap at offset=0 (sets it back to zero)
Writer-2: compare-and-swap at offset=0 again even though write_offset=1
Writer-2: Set write_offset=2
于 2012-11-26T19:20:56.200 に答える
0

それが問題だと思いますint_queue->write_offset++;。2つのスレッドがこの命令を同時に実行すると、両方ともメモリから同じ値をロードしてインクリメントし、同じ結果を保存します(変数が1つだけ増えるように)。

于 2012-11-26T18:52:44.097 に答える
-1

私の意見は

int_queue->write_offset++;

write_offset = int_queue->write_offset & int_queue->modulus; 

スレッドセーフではありません

于 2012-11-26T19:15:24.467 に答える