18

私のシステムには、タイプ A と B の 2 つの異なるタイプのメッセージがあります。各メッセージには異なる構造があります。タイプ A には int メンバーが含まれ、タイプ B には double メンバーが含まれます。私のシステムでは、両方のタイプのメッセージを多数のビジネス ロジック スレッドに渡す必要があります。待ち時間を短縮することは非常に重要であるため、ディスラプターを使用してメイン スレッドからビジネス ロジック スレッドにメッセージを機械的に適切に渡す方法を調査しています。

私の問題は、ディスラプターがリング バッファー内の 1 種類のオブジェクトしか受け付けないことです。これは、ディスラプタがリング バッファ内のオブジェクトを事前に割り当てるため、理にかなっています。ただし、Disruptor を介して 2 つの異なるタイプのメッセージをビジネス ロジック スレッドに渡すことも困難です。私が知る限り、次の 4 つのオプションがあります。

  1. 固定サイズのバイト配列を含むオブジェクトを使用するようにディスラプターを構成します( How should one use Disruptor (Disruptor Pattern) to build real-world message systems?で推奨されているように)。この場合、メイン スレッドはメッセージをディスラプタにパブリッシュする前にバイト配列にエンコードする必要があり、各ビジネス ロジック スレッドは受信時にバイト配列をデコードしてオブジェクトに戻す必要があります。このセットアップの欠点は、ビジネス ロジック スレッドが実際にはディスラプターからのメモリを共有していないことです。代わりに、ディスラプターによって提供されたバイト配列から新しいオブジェクトを作成しています (したがって、ガベージを作成しています)。このセットアップの利点は、すべてのビジネス ロジック スレッドが同じディスラプタから複数の異なるタイプのメッセージを読み取ることができることです。

  2. 単一タイプのオブジェクトを使用するようにディスラプターを構成しますが、オブジェクト タイプごとに 1 つずつ、複数のディスラプターを作成します。上記の場合、タイプ A のオブジェクト用とタイプ B のオブジェクト用の 2 つの個別のディスラプターがあります。このセットアップの利点は、メイン スレッドがオブジェクトをバイト配列にエンコードする必要がないことです。ビジネス レス ロジック スレッドは、ディスラプターで使用されるのと同じオブジェクトを共有できます (ガベージは作成されません)。このセットアップの欠点は、何らかの形で各ビジネス ロジック スレッドが複数のディスラプターからのメッセージをサブスクライブする必要があることです。

  3. メッセージ A と B の両方のすべてのフィールドを含む単一タイプの「スーパー」オブジェクトを使用するようにディスラプターを構成します。

  4. オブジェクト参照を使用するようにディスラプターを構成します。ただし、この場合、オブジェクトの事前割り当てとメモリの順序付けによるパフォーマンス上の利点が失われます。

この状況におすすめは?オプション #2 が最もクリーンなソリューションだと思いますが、消費者が複数のディスラプターからのメッセージを技術的にサブスクライブできるかどうか、またはどのようにサブスクライブできるかはわかりません。オプション#2を実装する方法の例を誰かが提供できれば、それは大歓迎です!

4

3 に答える 3

3

固定サイズのバイト配列を含むオブジェクトを使用するようにディスラプターを構成します (How should one use Disruptor (Disruptor Pattern) to build real-world message systems? で推奨されているように)。この場合、メイン スレッドはメッセージをディスラプタにパブリッシュする前にバイト配列にエンコードする必要があり、各ビジネス ロジック スレッドは受信時にバイト配列をデコードしてオブジェクトに戻す必要があります。このセットアップの欠点は、ビジネス ロジック スレッドが実際にはディスラプターからのメモリを共有していないことです。代わりに、ディスラプターによって提供されたバイト配列から新しいオブジェクトを作成しています (したがって、ガベージを作成しています)。このセットアップの利点は、すべてのビジネス ロジック スレッドが同じディスラプタから複数の異なるタイプのメッセージを読み取ることができることです。

これは私の好みのアプローチですが、ディスラプターを使用したほぼすべての場所で、ある種の I/O デバイスから受信または送信しているため、ユースケースによって少し色付けしました。したがって、基本的な通貨はバイト配列です。フライウェイト アプローチをマーシャリングに使用することで、オブジェクトの作成を回避できます。この例を見るために、Devoxx ( https://github.com/mikeb01/ticketing )で提示した例で Javolution の Struct クラスと Union クラスを使用しました。イベント ハンドラからの onEvent 呼び出しから戻る前にオブジェクトを完全に処理できる場合、このアプローチはうまく機能します。イベントがその時点を超えて存続する必要がある場合は、データの何らかのコピーを作成する必要があります。たとえば、データをオブジェクトに逆シリアル化します。

単一タイプのオブジェクトを使用するようにディスラプターを構成しますが、オブジェクト タイプごとに 1 つずつ、複数のディスラプターを作成します。上記の場合、タイプ A のオブジェクト用とタイプ B のオブジェクト用の 2 つの個別のディスラプターがあります。このセットアップの利点は、メイン スレッドがオブジェクトをバイト配列にエンコードする必要がないことです。ビジネスレスロジックスレッドは、ディスラプターで使用されるのと同じオブジェクトを共有できます (ガベージは作成されません)。このセットアップの欠点は、何らかの形で各ビジネス ロジック スレッドが複数のディスラプターからのメッセージをサブスクライブする必要があることです。

このアプローチを試していない場合は、複数のリング バッファーからポーリングできるカスタム EventProcessor が必要になる可能性があります。

メッセージ A と B の両方のすべてのフィールドを含む単一タイプの「スーパー」オブジェクトを使用するようにディスラプターを構成します。オブジェクト参照を使用するようにディスラプターを構成します。ただし、この場合、オブジェクトの事前割り当てとメモリの順序付けによるパフォーマンス上の利点が失われます。

これは、事前割り当ての欠如が許容されるいくつかのケースで行われました。それは問題なく動作します。オブジェクトを渡す場合は、コンシューマー側でそれらを使い終わったら、それらを無効にする必要があります。「スーパー」オブジェクトに二重ディスパッチ パターンを使用すると、実装がかなりきれいに保たれることがわかりました。これの欠点の 1 つは、マーク フェーズ中に GC が通過するライブ オブジェクトが増えるため、オブジェクトの直線的な配列である何かで GC ストールがわずかに長くなることです。

この状況におすすめは?オプション #2 が最もクリーンなソリューションだと思いますが、消費者が技術的に複数のディスラプターからのメッセージをサブスクライブできるかどうか、またはどのようにサブスクライブできるかはわかりません。オプション#2を実装する方法の例を誰かが提供できれば、それは大歓迎です!

もう 1 つのオプションは、データの使用に関して完全な柔軟性が必要な場合は、リング バッファーを使用せず、代わりにシーケンサーに直接話しかけて、最適と思われるオブジェクト レイアウトを定義することです。

于 2015-05-18T16:16:02.440 に答える
2

Ben Baumgold さん、もう解決策を見つけたと思います。#4 (または #3) は、イベント ホルダーを作成することで簡単に実装できます。オブジェクトの列挙型と考えてください。ルックアップを高速化するには、イベントを列挙型で強化する必要があります。注意、元のイベントへの参照をホルダーに保存しています。コピー コンストラクターまたは clone() を作成し、リング バッファーへの挿入時にイベントをコピーする方が適切な場合があります。

例による説明:

// これはイベントで使用される列挙型です

public enum MyEventEnum {
EVENT_TIMER,
EVENT_MARKETDATA;
}

// これはホルダーです。ringbuffer 内のこのインスタンスは、常に、array[ type.ordinal() ] によってインデックス付けされた 1 つのイベントのみを保持します。配列がコードから明らかであるべき理由。

public class RingBufferEventHolder {    
 private MyEventEnum;   
 private EventBase array[];

 public RingBufferEventHolder() {
    array=new EventBase[MyEventEnum.values().length]; 
 }

 // TODO: null the rest
 public void setEvent(EventBase event) {
    type=event.getType();
    switch( event.getType() ) {
        case EVENT_TIMER:
            array[MyEventEnum.EVENT_TIMER.ordinal()]=event;
            break;
        case EVENT_MARKETDATA:
            array[MyEventEnum.EVENT_MARKETDATA.ordinal()]=event;
            break;
        default:
            throw new RuntimeException("Unknown event type " + event );
    }
}

// イベントを公開

   EventBase newEvent=new EventMarketData(....);
   // prepare
   long nextSequence = ringBuffer.next(); 
   RingBufferEventHolder holder = ringBuffer.get(nextSequence);
   holder.setEvent(newEvent);
   // make the event available to EventProcessors 
   ringBuffer.publish(nextSequence);
于 2016-10-20T06:13:53.750 に答える