0

設計を再考した後、水田からの入力により、このようなものを思いつきましたが、それが正しいかどうか疑問に思います。実行すると問題ないようです...アイデアは、事前に割り当てられたオブジェクトが次から継承されることです:

struct Node
{
    void* pool;
};

このようにして、割り当てられたすべてのオブジェクトに、後で解放するためにそのプールへのポインターを注入します。次に、次のようになります。

template<class T, int thesize>
struct MemPool
{
    T* getNext();
    void free(T* ptr);

    struct ThreadLocalMemPool
    {
        T* getNextTL();
        void freeTL();

        int size;
        vector<T*> buffer;
        vector<int> freeList;
        int freeListIdx;
        int bufferIdx;
        ThreadLocalMemPool* nextTlPool; //within a thread's context a linked list
    };

    int size;
    threadlocal ThreadLocalMemPool* tlPool; //one of these per thread
};

つまり、基本的に私が言うMemPool<Cat, 100>と、スレッドごとにスレッドgetNextsローカルのmempoolをインスタンス化するmempoolを提供します。サイズは、モジュロを簡単にするために内部で最も近い 2 のべき乗に丸められます (簡単にするために省略します)。は各スレッドにローカルであるためgetNext()、ロックは必要ありません。次のように、解放部分にアトミックを使用しようとしています。

T* ThreadLocalMemPool::getNextTL()
{
    int iHead = ++bufferIdx % size;
    int iTail = freeListIdx % size;

    if (iHead != iTail)  // If head reaches tail, the free list is empty.
    {
        int & idx = freeList[iHead];
        while (idx == DIRTY) {}
        return buffer[idx];
    }
    else
    {
        bufferIdx--; //we will recheck next time
        if (nextTLPool)
            return nextTLPool->getNextTL();
        else
            //set nextTLPool to a new ThreadLocalMemPool and return getNextTL() from it..
    }
}

void ThreadLocalMemPool::free(T* ptr)
{
    //the outer struct handles calling this in the right ThreadLocalMemPool

    //we compute the index in the pool from which this pool came from by subtracting from
    //its address the address of the first pointer in this guys buffer
    int idx = computeAsInComment(ptr);

    int oldListIdx = atomic_increment_returns_old_value(freeListIdx);
    freeList[oldListIdx % size] = idx;
}

さて、アイデアは、割り当てた以上に解放することはできないため(正しい使用法を想定しています)、freeListIdx常にプール内の後ろをたどるということです。bufferIdxfree への呼び出しは、バッファ インデックスをフリー リストに返す順序を同期し、getNext はサイクル バックするときにこれを取得します。私はそれについて少し考えてきましたが、論理に意味的に間違っているものは何も見当たりません。

4

1 に答える 1

3

スレッドセーフの問題にはロックが必要です。それを緩和したい場合は、1 つのスレッドだけがプールを使用するという制約が必要です。1 つのスレッドが割り当てを担当し、もう 1 つのスレッドが割り当て解除を担当するという条件で、以下で説明する循環フリー リストを使用する場合、これを 2 つのスレッドに拡張できます。

他の管理なしでベクターを使用することに関しては、それは悪い考えです... 断片化し始めるとすぐに、割り当てがヒットします。

これを実装する良い方法は、T の大きなブロックを割り当てることです。次に、これらのブロックのそれぞれを指すのに十分な大きさの循環キューを作成します。それがあなたの「フリーリスト」です。インデックスの使用を選択することもできます。各プールを 65536 項目に制限する場合、unsigned short を選択してスペースを節約できます (実際には、効率的な循環キュー管理を可能にするために 65535 です)。

循環キューを使用すると、断片化に関係なく一定時間の割り当てと割り当て解除が可能になります。また、いつプールがいっぱいになるか (つまり、空きリストが空になるか) がわかるので、別のプールを作成できます。明らかに、プールを作成するときは、空きリストを埋める必要があります。

したがって、クラスは次のようになります。

template<class T, size_t initSize>
class MemPool
{
    vector<T> poolBuffer;              // The memory pool
    vector<unsigned short> freeList;   // Ring-buffer (indices of free items)
    unsigned short nHead, nTail;       // Ring-buffer management
    int nCount;                        // Number of elements in ring-buffer
    MemPool<T,initSize> *nextPool;     // For expanding memory pool

    // etc...
};

さて、施錠です。アトミック インクリメントおよびデクリメント命令にアクセスでき、十分に注意している場合は、スレッド セーフでフリー リストを維持できます。必要な唯一のミューテックス スタイルのロックは、新しいメモリ プールを割り当てる必要がある場合です。

私は当初の考えを変えました。2 つのアトミック操作が必要であり、キューでの非アトミック操作のためにスピンする予約済みのインデックス値 (0xffff) が必要です。

// I'll have the functions atomic_incr() and atomic_decr().  The assumption here
// is that they do the operation and return the value as it was prior to the
// increment/decrement.  I'll also assume they work correctly for both int and
// unsigned short types.
unsigned short atomic_incr( unsigned short & );
int atomic_incr( int & );
int atomic_decr( int & );

したがって、割り当ては次のようになります。

T* alloc()
{
    // Check the queue size.  If it's zero (or less) we need to pass on
    // to the next pool and/or allocate a new one.
    if( nCount <= 0 ) {
        return alloc_extend();
    }

    int count = atomic_decr(nCount);
    if( count <= 0 ) {
        T *mem = alloc_extend();
        atomic_incr(nCount);     // undo
        return mem;
    }

    // We are guaranteed that there is at least 1 element in the list for us.
    // This will overflow naturally to achieve modulo by 65536.  You can only
    // deal with queue sizes that are a power of 2.  If you want 32768 values,
    // for example, you must do this: head &= 0x7fff;
    unsigned short head = atomic_incr(nHead);

    // Spin until the element is valid (use a reference)
    unsigned short & idx = freeList[head];
    while( idx == 0xffff );

    // Grab the pool item, and blitz the index from the queue
    T * mem = &poolBuffer[idx];
    idx = 0xffff;

    return mem;
};

上記では、新しいプライベート メンバー関数を使用しています。

T * alloc_extend()
{
    if( nextPool == NULL ) {
        acquire_mutex_here();
        if( nextPool == NULL ) nextPool = new MemPool<T>;
        release_mutex_here();
        if( nextPool == NULL ) return NULL;
    }
    return nextPool->alloc();
}

解放したいとき:

void free(T* mem)
{
    // Find the right pool to free from.
    if( mem < &poolBuffer.front() || mem > &poolBuffer.back() )
    {
        if( nextPool ) nextPool->free(mem);
        return;
    }

    // You might want to maintain a bitset that indicates whether the memory has
    // actually been allocated so you don't corrupt your pool here, but I won't
    // do that in this example...

    // Work out the index.  Hope my pointer arithmetic is correct here.
    unsigned short idx = (unsigned short)(mem - &poolBuffer.front());

    // Push index back onto the queue.  As with alloc(), you might want to
    // use a mask on the tail to achieve modulo.
    int tail = atomic_incr(nTail);
    freeList[tail] = idx;

    // Don't need to check the list size.  We assume everything is sane. =)
    atomic_incr(nCount);
}

値 0xffff を事実上 NULL として使用していることに注意してください。この値の設定、クリア、およびスピンは、競合状況を防ぐためにあります。free他のスレッドがalloc. キューは循環しますが、その中のデータはまだ設定されていない可能性があります。

もちろん、インデックスの代わりにポインターを使用することもできます。ただし、これは 4 バイト (または 64 ビット アプリケーションでは 8 バイト) であり、プールしているデータ サイズによっては、メモリ オーバーヘッドがそれほど価値がない場合があります。個人的にはポインターを使用しますが、何らかの理由で、この回答ではインデックスを使用する方が簡単に思えました。

于 2012-09-17T22:34:05.937 に答える