待機中のスレッドの空のリンクリストから始めます。ヘッドは0に設定する必要があります。
CAS、コンペアアンドスワップを使用して、ウェイターのリストの先頭にスレッドを挿入します。ヘッド=-1の場合は、挿入したり待機したりしないでください。正しく行えば、CASを使用してリンクリストの先頭にアイテムを安全に挿入できます。
挿入後、待機中のスレッドはSIGUSR1で待機する必要があります。これを行うには、sigwait()を使用します。
準備ができると、シグナリングスレッドはCASを使用して待機リストの先頭を-1に設定します。これにより、それ以上のスレッドが待機リストに追加されるのを防ぎます。次に、シグナリングスレッドは待機リスト内のスレッドを繰り返し、pthread_kill(&thread、SIGUSR1)を呼び出して、待機中の各スレッドをウェイクアップします。
sigwaitを呼び出す前にSIGUSR1が送信された場合、sigwaitはすぐに戻ります。したがって、待機リストにスレッドを追加してからsigwaitを呼び出すまでの間に競合は発生しません。
編集:
CASがミューテックスよりも速いのはなぜですか?素人の答え(私は素人です)。レースがないときはオーバーヘッドが低いため、状況によっては高速になります。したがって、同時発生する問題を8-16-32-64-128ビットの連続メモリを変更する必要があるまで減らすことができ、競合があまり発生しない場合は、CASが勝ちます。CASは基本的に、通常の「mov」を実行しようとしていた場所で、もう少し凝った/高価なmov命令です。その「ロック交換」またはそのようなもの。
一方、ミューテックスは余分なものがたくさんあり、他のキャッシュラインを汚し、より多くのメモリバリアなどを使用します。CASはx86、x64などでメモリバリアとして機能しますが、もちろんロックを解除する必要がありますおそらくほぼ同じ量の余分なものであるミューテックス。
CASを使用してリンクリストにアイテムを追加する方法は次のとおりです。
while (1)
{
pOldHead = pHead; <-- snapshot of the world. Start of the race.
pItem->pNext = pHead;
if (CAS(&pHead, pOldHead, pItem)) <-- end of the race if phead still is pOldHead
break; // success
}
では、コードのCAS行に同時に複数のスレッドが含まれる頻度はどれくらいだと思いますか?実際には....あまり頻繁ではありません。複数のスレッドで数百万のアイテムを同時に追加するループを実行したテストを実行しましたが、発生する時間は1%未満です。実際のプログラムでは、それは決して起こらないかもしれません。
明らかに、レースがある場合は、戻ってそのループを再度実行する必要がありますが、リンクリストの場合、それはどのくらいの費用がかかりますか?
欠点は、そのメソッドを使用してアイテムをヘッドに追加する場合、そのリンクリストに対して非常に複雑なことを実行できないことです。二重リンクリストを実装してみてください。なんて痛い。
編集:
上記のコードでは、マクロCASを使用しています。Linuxを使用している場合、CAS=__sync_bool_compare_and_swapを使用するマクロ。gccアトミックビルトインを参照してください。Windowsを使用している場合、CAS=InterlockedCompareExchangeなどを使用したマクロ。Windowsのインライン関数は次のようになります。
inline bool CAS(volatile WORD* p, const WORD nOld, const WORD nNew) {
return InterlockedCompareExchange16((short*)p, nNew, nOld) == nOld;
}
inline bool CAS(volatile DWORD* p, const DWORD nOld, const DWORD nNew) {
return InterlockedCompareExchange((long*)p, nNew, nOld) == nOld;
}
inline bool CAS(volatile QWORD* p, const QWORD nOld, const QWORD nNew) {
return InterlockedCompareExchange64((LONGLONG*)p, nNew, nOld) == nOld;
}
inline bool CAS(void*volatile* p, const void* pOld, const void* pNew) {
return InterlockedCompareExchangePointer(p, (PVOID)pNew, (PVOID)pOld) == pOld;
}