2

Windowsには条件変数がないため(vista以降に導入されましたが、Windows XPおよび2003ではサポートされていません)、C++でスレッドセーフキューを実装するのは簡単ではありません。Win32にPOSIX条件変数を実装するための戦略。私が必要としたのは、セマフォと条件変数を使用せずに、CriticalSectionまたはMutexとEventを使用することです。

また、win32ネイティブAPIを使用するだけの正確な実装を見つけようとしましたが、うまくいきませんでした。だから私は自分で1つを終えました。問題は、コードがスレッドセーフであるかどうかを100%確信していないことです。誰が私にそれが大丈夫かどうかを言うことができますか?

class CEventSyncQueue
{
public:
    CEventSyncQueue(int nCapacity = -1);
    virtual ~CEventSyncQueue();
    virtual void Put(void* ptr);
    virtual void* Get();
protected:
    int m_nCapacity;
    CPtrList m_list;

    CRITICAL_SECTION m_lock;    
    HANDLE m_hGetEvent;
    HANDLE m_hPutEvent;
};

CEventSyncQueue::CEventSyncQueue(int nCapacity)
{
    m_nCapacity = nCapacity;

    ::InitializeCriticalSection(&m_lock);
    m_hPutEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
    m_hGetEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
}

CEventSyncQueue::~CEventSyncQueue()
{
    m_list.RemoveAll();

    ::CloseHandle(m_hGetEvent);
    ::CloseHandle(m_hPutEvent);

    ::DeleteCriticalSection(&m_lock);
}

void CEventSyncQueue::Put(void* ptr)
{
    ::EnterCriticalSection(&m_lock);

    while(m_nCapacity > 0 && m_list.GetCount() >= m_nCapacity)
    {
        ::LeaveCriticalSection(&m_lock);

        //wait
        if(::WaitForSingleObject(m_hPutEvent, INFINITE) != WAIT_OBJECT_0)
        {
            ASSERT(FALSE);
        }

        ::EnterCriticalSection(&m_lock);
    }
    if(m_nCapacity > 0)
    {
        ASSERT(m_list.GetCount() < m_nCapacity);
    }
    m_list.AddTail(ptr);

    ::SetEvent(m_hGetEvent);    //notifyAll
    ::LeaveCriticalSection(&m_lock);
}
void* CEventSyncQueue::Get()
{
    ::EnterCriticalSection(&m_lock);

    while(m_list.IsEmpty())
    {
        ::LeaveCriticalSection(&m_lock);

        //wait
        if(::WaitForSingleObject(m_hGetEvent, INFINITE) != WAIT_OBJECT_0)
        {
            ASSERT(FALSE);
        }

        ::EnterCriticalSection(&m_lock);
    }
    ASSERT(!m_list.IsEmpty());
    void* ptr = m_list.RemoveHead();

    ::SetEvent(m_hPutEvent);    //notifyAll
    ::LeaveCriticalSection(&m_lock);

    return ptr;
}
4

3 に答える 3

1

Windowsでスレッドセーフキューを実装するのは簡単です。私はDelphi、C ++、BCBなどでそれを行いました。

なぜ条件変数が必要だと思いますか?Windowsメッセージキューはどのように機能すると思いますか?

イベントは、PCキューに使用するための間違ったプリミティブです。最も簡単で明確な方法は、セマフォを使用することです。

単純な無制限の生産者/消費者キュー。

template <typename T> class PCSqueue{
    CRITICAL_SECTION access;
    deque<T> *objectQueue;
    HANDLE queueSema;
public:
    PCSqueue(){
        objectQueue=new deque<T>;
        InitializeCriticalSection(&access);
        queueSema=CreateSemaphore(NULL,0,MAXINT,NULL);
    };
    void push(T ref){
        EnterCriticalSection(&access);
        objectQueue->push_front(ref);
        LeaveCriticalSection(&access);
        ReleaseSemaphore(queueSema,1,NULL);
    };
    bool pop(T *ref,DWORD timeout){
        if (WAIT_OBJECT_0==WaitForSingleObject(queueSema,timeout)) {
            EnterCriticalSection(&access);
            *ref=objectQueue->back();
            objectQueue->pop_back();
            LeaveCriticalSection(&access);
            return(true);
        }
        else
            return(false);
    };
};

編集-制限されたキューはそれほど難しくありません-空のスペースを数えるために別のsemaphreが必要です。制限付きキューは使用しませんが、問題はないと確信しています。2つのセマフォとミューテックス/CSを備えた制限付きキューが標準パターンです。

編集:PostMessage()またはPostThreadMessage()API呼び出しを使用します-これらは「waveOutProc」コールバックから安全であると明示的に宣言されています。MSDNによると、「他の波動関数」を呼び出すとデッドロックが発生します。セマフォ呼び出しはそのセットに含まれていません。SetEvent()が許可されていても、ReleaseSemaphore()が許可されていなかった場合は、非常に驚​​きます。実際、WindowsではReleaseSemaphore()がどこにも存在しないのに、SetEvent()が許可されていたら、私は驚きます。

于 2012-07-29T06:20:43.357 に答える
0

条件変数?インターロック*機能のことですか?これらは長い間存在していました。私はWindows2000で使用しました。同時実行システムを構築するために使用できますが、それでも自分で少し作業を行う必要があります。

または、OpenMPを試してください。これを使用するには、VisualStudio2008以降が必要です。

于 2012-07-30T00:55:01.707 に答える
0

考え直してみると、セマフォを明示的に実装する必要はほとんどありません。代わりに、イベントを使用してセマフォを実装する方法を考え、その方法で問題にアプローチしてください。私の最初の試みは手動リセットイベントを使用しましたが、これは非効率的でしたが明らかに正しいものでした。その後、最適化しました。

これらのコードフラグメントのいずれもデバッグ(またはコンパイル)していないことに注意してください。ただし、正しいアイデアが得られるはずです。手動リセットバージョンは次のとおりです。

class CEventSyncQueue
{
public:
    CEventSyncQueue(int nCapacity = -1);
    virtual ~CEventSyncQueue();
    virtual void Put(void* ptr);
    virtual void* Get();
protected:
    int m_nCapacity;
    CPtrList m_list;

    CRITICAL_SECTION m_lock;    
    HANDLE m_queue_not_empty;
    HANDLE m_queue_not_full;
};

CEventSyncQueue::CEventSyncQueue(int nCapacity)
{
    m_nCapacity = nCapacity;
    ::InitializeCriticalSection(&m_lock);
    m_queue_not_empty = ::CreateEvent(NULL, TRUE, FALSE, NULL);
    m_queue_not_full = ::CreateEvent(NULL, TRUE, TRUE, NULL);
}

CEventSyncQueue::~CEventSyncQueue()
{
    m_list.RemoveAll();
    ::CloseHandle(m_queue_not_empty);
    ::CloseHandle(m_queue_not_full);
    ::DeleteCriticalSection(&m_lock);
}

void CEventSyncQueue::Put(void* ptr)
{
    bool done = false;
    while (!done)
    {
        // If the queue is full, we must wait until it isn't.
        if(::WaitForSingleObject(m_queue_not_full, INFINITE) != WAIT_OBJECT_0)
        {
            ASSERT(FALSE);
        }

        // However, we might not be the first to respond to the event,
        // so we still need to check whether the queue is full and loop
        // if it is.
        ::EnterCriticalSection(&m_lock);
        if (m_nCapacity <= 0 || m_list.GetCount() < m_nCapacity)
        {
            m_list.AddTail(ptr);
            done = true;
            // The queue is definitely not empty.
            SetEvent(m_queue_not_empty);
            // Check whether the queue is now full.
            if (m_nCapacity > 0 && m_list.GetCount() >= m_nCapacity)
            {
                ResetEvent(m_queue_not_full);
            }
        }
        ::LeaveCriticalSection(&m_lock);
    }
}

void* CEventSyncQueue::Get()
{
    void *result = nullptr;
    while (result == nullptr)
    {
        // If the queue is empty, we must wait until it isn't.
        if(::WaitForSingleObject(m_queue_not_empty, INFINITE) != WAIT_OBJECT_0)
        {
            ASSERT(FALSE);
        }

        // However, we might not be the first to respond to the event,
        // so we still need to check whether the queue is empty and loop
        // if it is.
        ::EnterCriticalSection(&m_lock);
        if (!m_list.IsEmpty())
        {
            result = m_list.RemoveHead();
            ASSERT(result != nullptr);
            // The queue shouldn't be full at this point!
            ASSERT(m_nCapacity <= 0 || m_list.GetCount() < m_nCapacity);
            SetEvent(m_queue_not_full);
            // Check whether the queue is now empty.
            if (m_list.IsEmpty())
            {
                ResetEvent(m_queue_not_empty);
            }
        }
        ::LeaveCriticalSection(&m_lock);
    }
}

そして、これがより効率的な自動リセットイベントバージョンです。

class CEventSyncQueue
{
public:
    CEventSyncQueue(int nCapacity = -1);
    virtual ~CEventSyncQueue();
    virtual void Put(void* ptr);
    virtual void* Get();
protected:
    int m_nCapacity;
    CPtrList m_list;

    CRITICAL_SECTION m_lock;    
    HANDLE m_queue_not_empty;
    HANDLE m_queue_not_full;
};

CEventSyncQueue::CEventSyncQueue(int nCapacity)
{
    m_nCapacity = nCapacity;
    ::InitializeCriticalSection(&m_lock);
    m_queue_not_empty = ::CreateEvent(NULL, FALSE, FALSE, NULL);
    m_queue_not_full = ::CreateEvent(NULL, FALSE, TRUE, NULL);
}

CEventSyncQueue::~CEventSyncQueue()
{
    m_list.RemoveAll();
    ::CloseHandle(m_queue_not_empty);
    ::CloseHandle(m_queue_not_full);
    ::DeleteCriticalSection(&m_lock);
}

void CEventSyncQueue::Put(void* ptr)
{
    if (m_nCapacity <= 0)
    {
        ::EnterCriticalSection(&m_lock);
        m_list.AddTail(ptr);
        SetEvent(m_queue_not_empty);
        ::LeaveCriticalSection(&m_lock);
        return;
    }

    bool done = false;
    while (!done)
    {
        // If the queue is full, we must wait until it isn't.
        if(::WaitForSingleObject(m_queue_not_full, INFINITE) != WAIT_OBJECT_0)
        {
            ASSERT(FALSE);
        }

        // However, under some (rare) conditions we'll get here and find
        // the queue is already full again, so be prepared to loop.
        ::EnterCriticalSection(&m_lock);
        if (m_list.GetCount() < m_nCapacity)
        {
            m_list.AddTail(ptr);
            done = true;
            SetEvent(m_queue_not_empty);
            if (m_list.GetCount() < m_nCapacity)
            {
                SetEvent(m_queue_not_full);
            }
        }
        ::LeaveCriticalSection(&m_lock);
    }
}

void* CEventSyncQueue::Get()
{
    void *result = nullptr;
    while (result == nullptr)
    {
        // If the queue is empty, we must wait until it isn't.
        if(::WaitForSingleObject(m_queue_not_empty, INFINITE) != WAIT_OBJECT_0)
        {
            ASSERT(FALSE);
        }

        // However, under some (rare) conditions we'll get here and find
        // the queue is already empty again, so be prepared to loop.
        ::EnterCriticalSection(&m_lock);
        if (!m_list.IsEmpty())
        {
            result = m_list.RemoveHead();
            ASSERT(result != nullptr);
            // The queue shouldn't be full at this point!
            if (m_nCapacity <= 0) ASSERT(m_list.GetCount() < m_nCapacity);
            SetEvent(m_queue_not_full);
            if (!m_list.IsEmpty())
            {
                SetEvent(m_queue_not_empty);
            }
        }
        ::LeaveCriticalSection(&m_lock);
    }
}
于 2012-07-31T04:53:35.917 に答える