4

私は、ブロッキング キューとロックフリー キューを認識しています。これは、スコットらによって提供されている実装の好例です。、しかし、ロックフリーブロッキングキューの実装はありますか?

lock-free-blocking-queue では、dequeue はロックを必要としませんが、キューにアイテムがない場合、コンシューマーをブロックします。そのような獣の実装はありますか? それらが C# 実装であることが望ましいですが、どの実装でも技術的には機能します。

アップデート:

D14.1 行で競合状態になると思います。

initialize(Q: pointer to queue t)
node = new node() // Allocate a free node
node–>next.ptr = NULL // Make it the only node in the linked list
Q–>Head = Q–>Tail = node // Both Head and Tail point to it
signal = new ManualResetEvent() // create a manual reset event

    enqueue(Q: pointer to queue t, value: data type)
E1:     node = new node() // Allocate a new node from the free list
E2:     node–>value = value // Copy enqueued value into node
E3:     node–>next.ptr = NULL // Set next pointer of node to NULL
E4:     loop // Keep trying until Enqueue is done
E5:         tail = Q–>Tail // Read Tail.ptr and Tail.count together
E6:         next = tail.ptr–>next // Read next ptr and count fields together
E7:         if tail == Q–>Tail // Are tail and next consistent?
E8:             if next.ptr == NULL // Was Tail pointing to the last node?
E9:                 if CAS(&tail.ptr–&gt;next, next, <node, next.count+1>) // Try to link node at the end of the linked list
E10.1:                  signal.Set() // Signal to the blocking dequeues
E10.2:                  break // Enqueue is done. Exit loop
E11:                endif
E12:            else // Tail was not pointing to the last node
E13:                CAS(&Q–&gt;Tail, tail, <next.ptr, tail.count+1>) // Try to swing Tail to the next node
E14:            endif
E15:        endif
E16:     endloop
E17:    CAS(&Q–&gt;Tail, tail, <node, tail.count+1>) // Enqueue is done. Try to swing Tail to the inserted node


    dequeue(Q: pointer to queue t, pvalue: pointer to data type): boolean
D1:     loop // Keep trying until Dequeue is done
D2:         head = Q–&gt;Head // Read Head
D3:         tail = Q–&gt;Tail // Read Tail
D4:         next = head–&gt;next // Read Head.ptr–&gt;next
D5:         if head == Q–&gt;Head // Are head, tail, and next consistent?
D6:             if head.ptr == tail.ptr // Is queue empty or Tail falling behind?
D7:                 if next.ptr == NULL // Is queue empty?
D8.1:                   signal.WaitOne() // Block until an enqueue
D8.X:                   // remove the return --- return FALSE // Queue is empty, couldn’t dequeue
D9:                 endif
D10:                CAS(&Q–&gt;Tail, tail, <next.ptr, tail.count+1>) // Tail is falling behind. Try to advance it
D11:            else // No need to deal with Tail
                    // Read value before CAS, otherwise another dequeue might free the next node
D12:                *pvalue = next.ptr–&gt;value
D13:                if CAS(&Q–&gt;Head, head, <next.ptr, head.count+1>) // Try to swing Head to the next node
D14.1:                  if(head.ptr == tail.ptr && next.ptr==NULL) // Is queue empty? <--- POSSIBLE RACE CONDITION???
D14.2:                      signal.Reset()
D14.3:                  break // Dequeue is done. Exit loop
D15:                endif
D16:            endif
D17:         endif
D18:    endloop
D19:    free(head.ptr) // It is safe now to free the old dummy node
D20:    return TRUE // Queue was not empty, dequeue succeeded
4

2 に答える 2

1

編集:

シンプル: キューに頭と尻尾は必要ないことをお勧めします。頭があります。head = NULLの場合、リストは空です。頭にアイテムを追加します。頭からアイテムを削除します。よりシンプルで少ないCAS操作。

ヘルパー: コメントで、レースを処理するためのヘルパースキームを考える必要があることを提案しました。「ロックフリー」が意味する私のバージョンでは、問題が発生しない場合は、まれな競合状態が発生しても問題ありません。アイドル状態のスレッドを数ミリ秒長くスリープさせるよりも、パフォーマンスが向上するのが好きです。

ヘルパーのアイデア。消費者が仕事をつかむとき、それは昏睡状態に糸があるかどうか見ることができます。プロデューサーが作業を追加すると、コマ内のスレッドを探すことができます。

したがって、枕木を追跡します。枕木のリンクリストを使用します。スレッドが作業がないと判断すると、スレッドは自分自身を!awakeとしてマークし、CAS自体がスリーパーリストの先頭になります。ウェイクアップする信号を受信すると、スレッドは自分自身をアウェイクとしてマークします。次に、新しく目覚めたスレッドがスリーパーリストをクリーンアップします。同時単一リンクリストをクリーンアップするには、注意する必要があります。あなたは頭にのみCASすることができます。したがって、スリーパーリストの先頭が目覚めているとマークされている間は、先頭をCASでオフにすることができます。頭が起きていない場合は、リストをスキャンし続け、残りの起きているアイテムを「怠惰なリンク解除」(私はその用語を作成しました)します。怠惰なリンク解除は簡単です...前のアイテムの次のptrを目覚めているアイテムの上に設定するだけです。同時スキャンは、!awakeしているアイテムに到達した場合でも、リストの最後に到達します。後続のスキャンでは、短いリストが表示されます。ついに、仕事を追加したり、仕事をやめたりするときはいつでも、スリーパーリストをスキャンして!awakeアイテムを探してください。消費者がいくつかの作業を取得した後に作業が残っていることに気付いた場合(.next work!= NULL)、消費者はスリーパーリストをスキャンして、!awakeである最初のスレッドに信号を送ることができます。プロデューサーが作業を追加した後、プロデューサーはスリーパーリストをスキャンして同じことを行うことができます。

ブロードキャストシナリオがあり、単一のスレッドに信号を送ることができない場合は、スリープ状態のスレッドの数を数え続けるだけです。そのカウントがまだ>0である間、残りの作業に気付いた消費者と作業を追加した消費者は、ウェイクアップするための信号をブロードキャストします。

私たちの環境では、SMTごとに1つのスレッドがあるため、スリーパーリストがこれほど大きくなることはありません(新しい128の同時スレッドマシンの1つを手に入れない限り)トランザクションの早い段階で作業項目を生成します。最初の1秒間に、10,000の作業項目が生成される可能性があり、この生産は急速に減少します。スレッドは、これらの作業項目で数秒間機能します。したがって、アイドルプールにスレッドが存在することはめったにありません。

あなたはまだロック を使うことができますあなたが1つのスレッドしか持っておらず、めったに仕事を生成しないなら...これはあなたのために働きません。その場合、ミューテックスのパフォーマンスは問題ではないので、ミューテックスを使用する必要があります。このシナリオでは、スリーパーキューのロックを使用します。ロックフリーは、「重要な場所にロックがない」と考えてください。

前の投稿: あなたは言っていますか:仕事のキューがあります。多くのコンシューマースレッドがあります。コンシューマーは、作業をプルして、作業がある場合はそれを実行する必要があります。コンシューマースレッドは、作業が行われるまでスリープする必要があります。

もしそうなら、私たちはこのように不可分操作のみを使用してこれを行います:

作業のキューはリンクリストです。スリープ状態のスレッドのリンクリストもあります。

作品を追加するには:リストの先頭を新しい作品にCASします。作業が追加されると、スリーパーリストにスレッドがあるかどうかを確認します。ある場合は、ワークを追加する前に、スリーパーをスリーパーリストからCASし、そのワーク=新しいワークを設定してから、スリーパーにウェイクアップするように通知します。作業を作業キューに追加します。

作業を消費するには:リストの先頭をhead->nextにCASします。ワークリストの先頭がNULLの場合、スレッドを枕木のリストにCASします。

スレッドにワークアイテムがあると、スレッドはワークアイテムの状態をWORK_INPROGRESSなどにCASする必要があります。それが失敗した場合、それは作業が別の人によって実行されていることを意味するので、コンシューマースレッドは作業の検索に戻ります。スレッドがウェイクアップして作業項目がある場合でも、状態をCASする必要があります。

したがって、仕事が追加されると、眠っている消費者は常に目を覚まして仕事を手渡します。pthread_kill()は、常にsigwait()でスレッドをウェイクアップします。これは、シグナルの後でスレッドがsigwaitに到達した場合でも、シグナルが受信されるためです。これにより、スレッドがスリーパーリストに追加されても、スリープ状態になる前にシグナルが送信されるという問題が解決されます。発生するのは、スレッドがその作業を所有しようとすることだけです。作業を所有できないか、作業がない場合、スレッドは消費開始に戻ります。スレッドがスリーパーリストへのCASに失敗した場合は、別のスレッドがそのスレッドを打ち負かしたか、プロデューサーがスリーパーを削除したことを意味します。安全のため、スレッドは目覚めたばかりのように動作します。

これを行うと競合状態が発生せず、複数のプロデューサーとコンシューマーが存在します。また、これを拡張して、スレッドが個々の作業項目でもスリープできるようにすることができました。

于 2011-04-20T20:59:29.413 に答える
1

.NET 並列拡張: (ビルトイン、.NET 4.0+ 用):

http://blogs.msdn.com/b/pfxteam/archive/2010/01/26/9953725.aspx


StackOverflow の実装の誰か:

.net のフリー コンストラクトをロックする



コメントでの説明への対応:

空のブロックがビジーでない (シグナルを待機する) 場合は、待機するカウント セマフォが必要なようです。

別のアプローチとして、通常のキューをアトミックな比較および交換またはスピン ロックと共に使用して同時アクセスを防止することができます。
キューが空のときにコンシューマ スレッドが入力しようとすると、バイナリ セマフォをロック
し、プロバイダ スレッドがキューに入力しようとすると、バイナリ セマフォをロックします。が空の場合、バイナリ セマフォのロックを解除して、すべてのスリーパー コンシューマーを目覚めさせます (そして、それらをスピン ロックに戻して、キューに十分な項目がある場合にのみ複数のスレッドが入ることができるようにします)。

例 // 疑似コード

/// Non-blocking lock (busy wait)
void SpinLock()
{
    While (CompareAndExchange(myIntegerLock, -1, 0) != 0)
    {
        // wait
    }
}

void UnSpinLock()
{
    Exchange(myIntegerLock, 0);
}

void AddItem(item)
{
    // Use CAS for synchronization
    SpinLock(); // Non-blocking lock (busy wait)

    queue.Push(item);

    // Unblock any blocked consumers
    if (queue.Count() == 1)
    {
        semaphore.Increase();
    }

    // End of CAS synchronization block
    UnSpinLock();
}

Item RemoveItem()
{
    // Use CAS for synchronization
    SpinLock(); // Non-blocking lock (busy wait)

    // If empty then block
    if (queue.Count() == 0)
    {
        // End of CAS synchronization block
        UnSpinLock();

        // Block until queue is not empty
        semaphore.Decrease();

        // Try again (may fail again if there is more than one consumer)
        return RemoveItem();
    }

    result = queue.Pop();

    // End of CAS synchronization block
    UnSpinLock();

    return result;
}
于 2011-04-20T18:01:16.767 に答える