5

簡単な例でディスラプターのデザインパターンとは何か教えてもらえますか? このデザインパターンの基本を知りたいです。

4

3 に答える 3

4

簡単な Google で、Martin Fowler によるこの紹介を含め、多くの情報を得ることができます

大雑把なレベルでは、Disruptor はキューのマルチキャスト グラフと考えることができます。このマルチキャスト グラフでは、プロデューサーがオブジェクトを配置し、すべてのコンシューマーに送信して、個別のダウンストリーム キューを介して並行して消費します。内部を見ると、このキューのネットワークが実際には 1 つのデータ構造 (リング バッファー) であることがわかります。各プロデューサーとコンシューマーには、現在作業中のバッファー内のスロットを示すシーケンス カウンターがあります。各プロデューサー/コンシューマーは、独自のシーケンス カウンターを書き込みますが、他のシーケンス カウンターを読み取ることができます。このようにして、プロデューサーはコンシューマーのカウンターを読み取って、カウンターをロックすることなく、書き込みたいスロットが利用可能であることを確認できます。同様に、コンシューマーは、カウンターを監視することで、別のコンシューマーがメッセージを処理した後にのみメッセージを処理するようにすることができます。

GitHub プロジェクトには、Java コードとドキュメントが含まれています。

于 2013-01-31T16:50:29.640 に答える
2

この記事から:

ディスラプター パターンは、メモリ バリアを使用してシーケンスを通じてプロデューサーとコンシューマーを同期する、事前に割り当てられた転送オブジェクトで満たされた循環配列 (リング バッファー) によってバックアップされるバッチ キューです。

幸いなことに、ディスラプター パターンを使用するために本質的な詳細を理解する必要はありません。コードの方が理解しやすい場合は、以下に示すのはCoralQueueのHello Worldです。これは、ディスラプター パターンを実装するスレッド間通信用の超低レイテンシー キューです。

package com.coralblocks.coralqueue.sample.queue;

import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.Builder;

public class Basics {

    public static void main(String[] args) {

        final Queue<StringBuilder> queue = new AtomicQueue<StringBuilder>(1024, new Builder<StringBuilder>() {
            @Override
            public StringBuilder newInstance() {
                return new StringBuilder(1024);
            }
        });

        Thread producer = new Thread(new Runnable() {

            private final StringBuilder getStringBuilder() {
                StringBuilder sb;
                while((sb = queue.nextToDispatch()) == null) {
                    // queue can be full if the size of the queue
                    // is small and/or the consumer is too slow

                    // busy spin (you can also use a wait strategy instead)
                }
                return sb;
            }

            @Override
            public void run() {

                StringBuilder sb;

                while(true) { // the main loop of the thread

                    // (...) do whatever you have to do here...

                    // and whenever you want to send a message to
                    // the other thread you can just do:
                    sb = getStringBuilder();
                    sb.setLength(0);
                    sb.append("Hello!");
                    queue.flush();

                    // you can also send in batches to increase throughput:
                    sb = getStringBuilder();
                    sb.setLength(0);
                    sb.append("Hi!");

                    sb = getStringBuilder();
                    sb.setLength(0);
                    sb.append("Hi again!");

                    queue.flush(); // dispatch the two messages above...
                }
            }
        }, "Producer");

        Thread consumer = new Thread(new Runnable() {

            @Override
            public void run() {

                while (true) { // the main loop of the thread

                    // (...) do whatever you have to do here...

                    // and whenever you want to check if the producer
                    // has sent a message you just do:

                    long avail;
                    while((avail = queue.availableToPoll()) == 0) {
                        // queue can be empty!
                        // busy spin (you can also use a wait strategy instead)
                    }

                    for(int i = 0; i < avail; i++) {
                        StringBuilder sb = queue.poll();
                        // (...) do whatever you want to do with the data
                        // just don't call toString() to create garbage...
                        // copy byte-by-byte instead...
                    }
                    queue.donePolling();
                }
            }
        }, "Consumer");

        consumer.start();
        producer.start();
    }
}
于 2014-05-24T00:09:55.637 に答える