3

私はLMAX Disruptorの経験があり、 disruptorを使用してカスタム アクター メールボックスを実装したいと思っています。

ガイドラインはありますか?それは可能ですか?Akka のアクター メールボックスの制限は何ですか?

4

1 に答える 1

3

ここで述べたように、いくつかのメソッドを実装するだけで済みます。もちろん、リング バッファへのポインタを使用してメッセージを直接読み書きする必要があります。また、次の点にも注意してください。

  • 通常、ディスラプターは大量のメモリを事前に割り当てるため、アクターごとに 1 つのディスラプターを使用することはお勧めできませんBalancingPool

  • 異なるメッセージ タイプの消費、ジャーナリング、修復などの個別のコンシューマーが必要な場合は、別の RingBufferPointer (smthng のような) インスタンスをパラメーターとしてメールボックスに渡す必要があります (ジャーナリングには同じ開始値を使用し、異なるものには異なる開始値を使用します)。メッセージタイプ)、ただし、1 つの Disruptor を使用します。したがって、異なるメールボックスは 1 つのディスラプターを参照します。

  • メッセージの作成、抽出などの低レベルの制御が失われるため、デフォルトではバッチ割り当てはありません。

  • リングからの履歴を使用して、失敗したアクターの状態 (スーパーバイザー内preRestartまたはスーパーバイザー内) を復元することもできます。

LMAX のコメント:

従来のアプローチとは異なる方法で機能するため、慣れ親しんだ方法とは少し異なる方法で使用できます。たとえば、パターンをシステムに適用することは、すべてのキューをマジック リング バッファに置き換えるほど簡単ではありません。ガイドとなるコード サンプル、機能の概要を説明するブログや記事の数が増えていること、ご想像のとおりテクニカル ペーパーが詳細に説明されていること、パフォーマンス テストが Disruptor の使用方法の例を示していることなどがあります。 http://mechanitis.blogspot.com/2011/06/dissecting-disruptor-whats-so-special.html

そして、これはキュー/ディスラプター/アクターの短い比較です

疑似スカラコードでは、次のようになります。

object MyUnboundedMailbox {
  val buffer = new RingBuffer()

  class MyMessageQueue(val startPointer: Pointer, readerPointer: Pointer, writerPointer: Pointer) extends MessageQueue {

    // these should be implemented; queue used as example
    def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
      writerPointer.allocate(() => handle) //allocate one element and set, if you want different message types - you should allocate big amount of data before and block when it ends (to not interfere with another messages), so it has to be bounded queue then  

    }
    def dequeue(): Envelope = readerPointer.poll()
    def numberOfMessages: Int = writerPointer - readerPointer //should be synchronized
    def hasMessages: Boolean = readerPointer == writerPointer //should be synchronized
    def cleanUp(owner: ActorRef, deadLetters: MessageQueue) { }
  }

  trait MyUnboundedMessageQueueSemantics 

}

class MyUnboundedMailbox(settings: ActorSystem.Settings, config: Config) extends MailboxType
  with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] {

  import MyUnboundedMailbox._
  final override def create(owner: Option[ActorRef],
                            system: Option[ActorSystem]): MessageQueue = {

    val pointer = ring.newPointer
    val read = pointer.copy
    val write = pointer.copy
    new MyMessageQueue(pointer, read, write) 
  }
    // you may use another strategy here based on owner (you can access name and path here), 
    // so for example may allocate same pointers for same prefixes in the name or path 
}

変更されていない MyMessageQueue.startPointer を使用して、障害回復中にメッセージ ログにアクセスできます (類似性については、akka のイベント ソーシングを参照することもできます)。

リングが「終了」すると、非常に古い未配信メッセージが新しいバージョンで上書きされる可能性があるため、 UnboundedQueue アプローチを使用しても、ここでのメッセージ配信は保証されません。

于 2015-01-06T09:35:23.160 に答える