私は最近、Paul McKenney の 2010 年のホワイト ペーパー「Memory Barriers: a Hardware View for Software Hackers」を読みました。
特にメモリとコンパイラのバリアに関して、M&S キュー エンキュー関数を実装する、以下に示す C コードの小さなセクションに関するフィードバック/コメント/ガイダンスを非常に高く評価します。
このコードはポインターとカウンターのペアを使用してABAを処理しており、この投稿のために、x86/x64 専用に記述されていると見なす必要があります。
インライン コメントはすべて、この投稿のために現在書かれており、現在私が考えていることを表現しているという点で、この投稿の一部です。
簡潔にするために、アサートのコードと構造体のキャッシュ ライン パディングなどを削除しました。
現在、コードはかなり壊れていると思いますが、正しい理由でそう考えていることを確認したいと思います。
#define PAC_SIZE 2
struct lfds_queue_element
{
struct lfds_queue_element
*volatile next[PAC_SIZE];
void
*user_data;
};
struct lfds_queue_state
{
struct lfds_queue_element
*volatile enqueue[PAC_SIZE];
struct lfds_queue_element
*volatile dequeue[PAC_SIZE];
atom_t volatile
aba_counter;
};
void lfds_queue_internal_dcas_pac_enqueue( struct lfds_queue_state *lqs, struct lfds_queue_element *lqe )
{
ALIGN(ALIGN_DOUBLE_POINTER) struct lfds_queue_element
*local_lqe[PAC_SIZE], *enqueue[PAC_SIZE], *next[PAC_SIZE];
unsigned char cas_result = 0;
unsigned int backoff_iteration = 1;
/* TRD : here we have been passed a new element to place
into the queue; we initialize it and its next
pointer/counter pair
*/
local_lqe[POINTER] = lqe;
local_lqe[COUNTER] = (struct lfds_queue_element *) lfds_abstraction_atomic_increment( &lqs->aba_counter );
local_lqe[POINTER]->next[POINTER] = NULL;
local_lqe[POINTER]->next[COUNTER] = (struct lfds_queue_element *) lfds_abstraction_atomic_increment( &lqs->aba_counter );
/* TRD : now, I think there is a issue here, in that these values
are by no means yet necessarily visible to other cores
however, they only need to be visible once
the element has entered the queue, and for that
to happen, the contigious double-word CAS must
have occurred - and on x86/x64, this carries
with it an mfence
however, that mfence will only act to empty our
local store buffer - it will not cause other cores
to flush their invalidation queues, so surely
it can all still go horribly wrong?
ah, but if all other cores are only accessing
these variables using atomic operations, they
too will be issuing mfences and so at that
point flushing their invalidate queues
*/
do
{
enqueue[COUNTER] = lqs->enqueue[COUNTER];
enqueue[POINTER] = lqs->enqueue[POINTER];
next[COUNTER] = enqueue[POINTER]->next[COUNTER];
next[POINTER] = enqueue[POINTER]->next[POINTER];
/* TRD : now, this is interesting
we load the enqueue pointer and its next pointer
we then (immediately below) check to see they're unchanged
but this check is totally bogus! we could be reading
old values from our cache, where our invalidate queue
has not been processed, so the initial read contains
old data *and* we then go ahead and check from our cache
the same old values a second time
what's worse is that I think we could also read the correct
values for enqueue but an incorrect (old) value for its
next pointer...!
so, in either case, we easily mistakenly pass the if()
and then enter into code which does things to the queue
now, in both cases, the CAS will mfence, which will
cause us to see from the data structure the true
values, but how much will that help us - we need
to look to see what is actually being done
the if() checks next[POINTER] is NULL
if we have read a NULL for next, then we think
the enqueue pointer is correcly placed (it's not
lagging behind) so we don't need to help; we then
try to add our element to the end of the queue
now, it may be we have read enqueue properly but
next improperly and so we now try to add our element
where it will in fact truncate the queue!
the CAS however will mfence and so at this point
we will actually see the correct value for enqueue-next,
and this will prevent that occurring
if we look now at the else clause, here we have seen
that enqueue->next is not NULL, so the enqueue pointer
is out of place and we need to help, which we do by
moving it down the queue
here though we could have read enqueue correctly
but next incorrectly; the CAS will mfence, which will
update the cache, but since we're only comparing
the enqueue pointer with our copy of the enqueue
pointer, the fact our next pointer is wrong won't
change! so here, we move the enqueue pointer to
some old element - which although it might be in the
queue (so this would be an inefficiency, you'd have
to do a bunch more queue walking to get the enqueue
pointer to the final element) it might not be, too!
it could in the meantime have been dequeued and
that of course would be death
*/
if( lqs->enqueue[POINTER] == enqueue[POINTER] and lqs->enqueue[COUNTER] == enqueue[COUNTER] )
{
if( next[POINTER] == NULL )
{
local_lqe[COUNTER] = next[COUNTER] + 1;
cas_result = lfds_abstraction_atomic_dcas_with_backoff( (atom_t volatile *) enqueue[POINTER]->next, (atom_t *) local_lqe, (atom_t *) next, &backoff_iteration );
}
else
{
next[COUNTER] = enqueue[COUNTER] + 1;
lfds_abstraction_atomic_dcas( (atom_t volatile *) lqs->enqueue, (atom_t *) next, (atom_t *) enqueue );
}
}
}
while( cas_result == 0 );
local_lqe[COUNTER] = enqueue[COUNTER] + 1;
lfds_abstraction_atomic_dcas( (atom_t volatile *) lqs->enqueue, (atom_t *) local_lqe, (atom_t *) enqueue );
return;
}