211

ディスラプターのパターンを理解しようとしています。私はInfoQビデオを見て、彼らの論文を読んでみました。リングバッファが関係していることを理解しています。これは、キャッシュの局所性を利用して新しいメモリの割り当てを排除するために、非常に大きな配列として初期化されます。

位置を追跡する1つ以上のアトミック整数があるようです。各「イベント」は一意のIDを取得しているようで、リング内の位置は、リングのサイズなどに対するモジュラスを見つけることによって検出されます。

残念ながら、私はそれがどのように機能するかを直感的に理解していません。私は多くのトレーディングアプリケーションを実行し、アクターモデルを研究し、SEDAなどを調べました。

彼らのプレゼンテーションでは、このパターンは基本的にルーターの動作方法であると述べました。ただし、ルーターがどのように機能するかについての適切な説明も見つかりませんでした。

より良い説明への良い指針はありますか?

4

5 に答える 5

213

Google Code プロジェクトは、リング バッファの実装に関するテクニカル ペーパーを参照していますが、それがどのように機能するかを知りたい人にとっては、少しドライでアカデミックで難しいものです。ただし、より読みやすい方法で内部を説明し始めたブログ投稿がいくつかあります。ディスラプター パターンの核となるリング バッファーの説明、コンシューマー バリア(ディスラプターからの読み取りに関連する部分) の説明、および使用可能な複数のプロデューサーの処理に関するいくつかの情報があります。

ディスラプターの最も簡単な説明は次のとおりです。これは、可能な限り最も効率的な方法でスレッド間でメッセージを送信する方法です。キューの代替として使用できますが、多くの機能を SEDA およびアクターと共有しています。

キューとの比較:

Disruptor は、メッセージを別のスレッドに渡し、必要に応じてスレッドを起動する機能を提供します (BlockingQueue と同様)。ただし、3 つの明確な違いがあります。

  1. Disruptor のユーザーは、Entry クラスを拡張し、事前割り当てを行うファクトリを提供することによって、メッセージの格納方法を定義します。これにより、メモリの再利用 (コピー) が可能になるか、エントリに別のオブジェクトへの参照が含まれる可能性があります。
  2. Disruptor へのメッセージの書き込みは 2 段階のプロセスです。まず、リング バッファーでスロットが要求されます。これにより、適切なデータを入力できるエントリがユーザーに提供されます。次に、エントリをコミットする必要があります。この 2 フェーズのアプローチは、前述のメモリを柔軟に使用できるようにするために必要です。メッセージをコンシューマ スレッドに表示するのはコミットです。
  3. リング バッファから消費されたメッセージを追跡するのは、消費者の責任です。この責任をリング バッファー自体から遠ざけることで、各スレッドが独自のカウンターを保持するため、書き込み競合の量を減らすことができました。

アクターに比べて

Actor モデルは、特に提供されている BatchConsumer/BatchHandler クラスを使用する場合、他のほとんどのプログラミング モデルより Disruptor に近いです。これらのクラスは、消費されたシーケンス番号を維持する複雑さをすべて隠し、重要なイベントが発生したときに一連の単純なコールバックを提供します。ただし、いくつかの微妙な違いがあります。

  1. ディスラプターは 1 スレッド - 1 コンシューマ モデルを使用します。アクタは N:M モデルを使用します。つまり、好きなだけアクタを使用でき、固定数のスレッド (通常はコアごとに 1 つ) に分散されます。
  2. BatchHandler インターフェースは、追加の (そして非常に重要な) callback を提供しますonEndOfBatch()。これにより、I/O を行ってイベントをまとめてバッチ処理し、スループットを向上させるなど、遅いコンシューマが可能になります。他の Actor フレームワークでバッチ処理を行うことは可能ですが、他のほとんどすべてのフレームワークはバッチの最後にコールバックを提供しないため、タイムアウトを使用してバッチの終了を判断する必要があり、結果としてレイテンシが低下します。

セダとの比較

LMAX は、SEDA ベースのアプローチに代わる Disruptor パターンを構築しました。

  1. SEDA に比べて提供された主な改善点は、作業を並行して実行できることです。これを行うために、ディスラプターは複数のコンシューマーへの同じメッセージの (同じ順序での) マルチキャストをサポートしています。これにより、パイプラインでフォーク ステージが不要になります。
  2. また、コンシューマーが他のコンシューマーの結果を待つこともできます。その間に別のキューイング ステージを配置する必要はありません。コンシューマーは、依存しているコンシューマーのシーケンス番号を簡単に監視できます。これにより、パイプラインで結合ステージが不要になります。

メモリバリアとの比較

それについて考える別の方法は、構造化され、順序付けられたメモリバリアとしてです。プロデューサ バリアが書き込みバリアを形成し、コンシューマ バリアが読み取りバリアを形成します。

于 2011-07-03T08:03:40.823 に答える
138

まず、それが提供するプログラミング モデルを理解したいと思います。

1 人以上のライターがいます。1 人以上のリーダーがいます。古いものから新しいものへと完全に並べられたエントリの行があります (写真は左から右)。ライターは、右端に新しいエントリを追加できます。すべてのリーダーは、エントリを左から右に順番に読み取ります。明らかに、読者は過去の作家を読むことはできません。

エントリ削除の概念はありません。エントリが消費されるイメージを避けるために、「コンシューマ」ではなく「リーダー」を使用します。ただし、最後のリーダーの左側のエントリが役に立たなくなることは理解しています。

通常、読者は同時に独立して読むことができます。ただし、リーダー間の依存関係を宣言することはできます。リーダーの依存関係は、任意の非循環グラフにすることができます。リーダー B がリーダー A に依存している場合、リーダー B は過去のリーダー A を読み取ることができません。

リーダー A はエントリに注釈を付けることができ、リーダー B はその注釈に依存するため、リーダーの依存関係が発生します。たとえば、A はエントリに対して何らかの計算を行い、その結果をaエントリのフィールドに格納します。A が先に進むと、B はエントリを読み取ることができ、aA の値が保存されます。リーダー C が A に依存していない場合、C は を読み取ろうとすべきではありませんa

これは確かに興味深いプログラミング モデルです。パフォーマンスに関係なく、モデルだけでも多くのアプリケーションにメリットがあります。

もちろん、LMAX の主な目標はパフォーマンスです。事前に割り当てられたエントリのリングを使用します。リングは十分な大きさですが、設計容量を超えてシステムに負荷がかからないように制限されています。リングがいっぱいの場合、ライターは、最も遅いリーダーが先に進み、空きができるまで待機します。

ガベージ コレクションのコストを削減するために、エントリ オブジェクトは事前に割り当てられ、永久に存続します。新しいエントリ オブジェクトを挿入したり、古いエントリ オブジェクトを削除したりしません。代わりに、ライターは既存のエントリを要求し、そのフィールドにデータを入力して、読者に通知します。この見かけの 2 フェーズ アクションは、実際には単純なアトミック アクションです。

setNewEntry(EntryPopulator);

interface EntryPopulator{ void populate(Entry existingEntry); }

エントリの事前割り当ては、隣接するエントリ (可能性が非常に高い) が隣接するメモリ セルに配置されることも意味します。リーダーはエントリを順番に読み取るため、これは CPU キャッシュを利用する上で重要です。

そして、ロック、CAS、さらにはメモリバリアを回避するための多くの努力 (たとえば、ライターが 1 つしかない場合は、不揮発性シーケンス変数を使用する)

リーダーの開発者向け: 書き込みの競合を避けるために、異なる注釈リーダーは異なるフィールドに書き込む必要があります。(実際には、それらは異なるキャッシュラインに書き込む必要があります。) 注釈付きリーダーは、他の非依存リーダーが読み取る可能性のあるものに触れるべきではありません。これが、これらのリーダーがエントリを変更するのではなく、エントリに注釈を付けると私が言う理由です。

于 2011-07-16T05:48:14.353 に答える
41

Martin Fowler は、LMAX とディスラプター パターンについての記事The LMAX Architectureを書いています。

于 2011-07-19T07:49:45.400 に答える
17

私は、純粋な好奇心から、実際のソースを研究するために実際に時間をかけました。その背後にあるアイデアは非常に単純です。この記事を書いている時点での最新バージョンは 3.2.1 です。

コンシューマーが読み取るデータを保持する、事前に割り当てられたイベントを格納するバッファーがあります。

バッファは、バッファ スロットの可用性を示す長さのフラグの配列 (整数配列) によってサポートされます (詳細については、さらに参照してください)。配列は java#AtomicIntegerArray のようにアクセスされるため、この説明では配列が 1 であると想定することもできます。

プロデューサーの数に制限はありません。プロデューサがバッファに書き込もうとすると、長い数値が生成されます (AtomicLong#getAndIncrement を呼び出す場合と同様に、Disruptor は実際には独自の実装を使用しますが、同じように機能します)。これを生成された long を ProducerCallId と呼びましょう。同様に、consumerCallId は、コンシューマがバッファからスロットを読み取る ENDS 時に生成されます。最新の consumerCallId がアクセスされます。

(コンシューマーが多い場合は、ID が最も小さい呼び出しが選択されます。)

次に、これらの ID が比較され、両者の差がバッファー側よりも小さい場合、プロデューサーは書き込みを許可されます。

(producerCallId が最近の consumerCallId + bufferSize よりも大きい場合は、バッファーがいっぱいであることを意味し、プロデューサーはスポットが利用可能になるまでバス待機を強いられます。)

その後、生産者は自身の callId (prducerCallId modulo bufferSize) に基づいてバッファー内のスロットを割り当てられますが、bufferSize は常に 2 のべき乗 (バッファーの作成時に適用される制限) であるため、使用される実際の操作は producerCallId & (bufferSize - 1 )))。その後、そのスロットのイベントを自由に変更できます。

(実際のアルゴリズムはもう少し複雑で、最適化のために最近の consumerId を別のアトミック参照にキャッシュする必要があります。)

イベントが変更されると、変更が「公開」されます。フラグ配列のそれぞれのスロットを公開すると、更新されたフラグが入ります。フラグの値は、ループの数です (producerCallId を bufferSize で割った値です (ここでも bufferSize は 2 の累乗であるため、実際の操作は右シフトです)。

同様に、任意の数のコンシューマが存在する可能性があります。コンシューマーがバッファーにアクセスするたびに、consumerCallId が生成されます (コンシューマーがディスラプターにどのように追加されたかに応じて、ID 生成で使用されるアトミックが共有されるか、それぞれで個別に使用される場合があります)。次に、この consumerCallId が最新の ProducentCallId と比較され、2 つのうち小さい場合、リーダーは処理を続行できます。

(同様に、producerCallId が consumerCallId に対して偶数である場合、それはバッファーが空であり、コンシューマーが強制的に待機することを意味します。待機の方法は、ディスラプターの作成中に WaitStrategy によって定義されます。)

個々のコンシューマー (独自の ID ジェネレーターを持つコンシューマー) の場合、次にチェックされるのは、バッチで消費する機能です。バッファー内のスロットは、consumerCallId (インデックスはプロデューサーと同じ方法で決定されます) に対応するスロットから、最近の ProducerCallId に対応するスロットの順に調べられます。

これらは、フラグ配列に書き込まれたフラグ値と、consumerCallId 用に生成されたフラグ値を比較することにより、ループで検査されます。フラグが一致する場合は、スロットを埋めているプロデューサーが変更をコミットしたことを意味します。そうでない場合、ループは中断され、コミットされた最大の changeId が返されます。ConsumerCallId から changeId で受信されるまでのスロットは、バッチで消費できます。

コンシューマーのグループ (ID ジェネレーターが共有されているもの) が一緒に読み取られる場合、それぞれが 1 つの callId のみを取得し、その 1 つの callId のスロットのみがチェックされて返されます。

于 2014-06-05T14:23:06.307 に答える
8

この記事から:

ディスラプター パターンは、メモリ バリアを使用してシーケンスを通じてプロデューサーとコンシューマーを同期する、事前に割り当てられた転送オブジェクトで満たされた循環配列 (リング バッファー) によってバックアップされるバッチ キューです。

記憶の障壁について説明するのはちょっと難しいので、私の意見では、Trisha のブログ ( http://mechanitis.blogspot.com/2011/08/dissecting-disruptor-why-its-so-fast) が最善の試みを行っています。 html

ただし、低レベルの詳細に飛び込みたくない場合は、Java のメモリ バリアがvolatileキーワードまたはjava.util.concurrent.AtomicLong. ディスラプター パターン シーケンスはAtomicLongであり、プロデューサーとコンシューマーの間でロックの代わりにメモリ バリアを介してやり取りされます。

概念はコードの方が理解しやすいので、以下のコードはCoralQueueの単純なhelloworldです。これは、私が所属している CoralBlocks によって行われたディスラプター パターンの実装です。以下のコードでは、ディスラプター パターンがバッチ処理を実装する方法と、リング バッファー (つまり、循環配列) が 2 つのスレッド間のガベージ フリー通信を可能にする方法を確認できます。

package com.coralblocks.coralqueue.sample.queue;

import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.MutableLong;

public class Sample {

    public static void main(String[] args) throws InterruptedException {

        final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(1024, MutableLong.class);

        Thread consumer = new Thread() {

            @Override
            public void run() {

                boolean running = true;

                while(running) {
                    long avail;
                    while((avail = queue.availableToPoll()) == 0); // busy spin
                    for(int i = 0; i < avail; i++) {
                        MutableLong ml = queue.poll();
                        if (ml.get() == -1) {
                            running = false;
                        } else {
                            System.out.println(ml.get());
                        }
                    }
                    queue.donePolling();
                }
            }

        };

        consumer.start();

        MutableLong ml;

        for(int i = 0; i < 10; i++) {
            while((ml = queue.nextToDispatch()) == null); // busy spin
            ml.set(System.nanoTime());
            queue.flush();
        }

        // send a message to stop consumer...
        while((ml = queue.nextToDispatch()) == null); // busy spin
        ml.set(-1);
        queue.flush();

        consumer.join(); // wait for the consumer thread to die...
    }
}
于 2014-06-16T22:38:45.353 に答える