私はLMAX Disruptorの経験があり、 disruptorを使用してカスタム アクター メールボックスを実装したいと思っています。
ガイドラインはありますか?それは可能ですか?Akka のアクター メールボックスの制限は何ですか?
私はLMAX Disruptorの経験があり、 disruptorを使用してカスタム アクター メールボックスを実装したいと思っています。
ガイドラインはありますか?それは可能ですか?Akka のアクター メールボックスの制限は何ですか?
ここで述べたように、いくつかのメソッドを実装するだけで済みます。もちろん、リング バッファへのポインタを使用してメッセージを直接読み書きする必要があります。また、次の点にも注意してください。
通常、ディスラプターは大量のメモリを事前に割り当てるため、アクターごとに 1 つのディスラプターを使用することはお勧めできませんBalancingPool
。
異なるメッセージ タイプの消費、ジャーナリング、修復などの個別のコンシューマーが必要な場合は、別の RingBufferPointer (smthng のような) インスタンスをパラメーターとしてメールボックスに渡す必要があります (ジャーナリングには同じ開始値を使用し、異なるものには異なる開始値を使用します)。メッセージタイプ)、ただし、1 つの Disruptor を使用します。したがって、異なるメールボックスは 1 つのディスラプターを参照します。
メッセージの作成、抽出などの低レベルの制御が失われるため、デフォルトではバッチ割り当てはありません。
リングからの履歴を使用して、失敗したアクターの状態 (スーパーバイザー内preRestart
またはスーパーバイザー内) を復元することもできます。
LMAX のコメント:
従来のアプローチとは異なる方法で機能するため、慣れ親しんだ方法とは少し異なる方法で使用できます。たとえば、パターンをシステムに適用することは、すべてのキューをマジック リング バッファに置き換えるほど簡単ではありません。ガイドとなるコード サンプル、機能の概要を説明するブログや記事の数が増えていること、ご想像のとおりテクニカル ペーパーが詳細に説明されていること、パフォーマンス テストが Disruptor の使用方法の例を示していることなどがあります。 http://mechanitis.blogspot.com/2011/06/dissecting-disruptor-whats-so-special.html
そして、これはキュー/ディスラプター/アクターの短い比較です
疑似スカラコードでは、次のようになります。
object MyUnboundedMailbox {
val buffer = new RingBuffer()
class MyMessageQueue(val startPointer: Pointer, readerPointer: Pointer, writerPointer: Pointer) extends MessageQueue {
// these should be implemented; queue used as example
def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
writerPointer.allocate(() => handle) //allocate one element and set, if you want different message types - you should allocate big amount of data before and block when it ends (to not interfere with another messages), so it has to be bounded queue then
}
def dequeue(): Envelope = readerPointer.poll()
def numberOfMessages: Int = writerPointer - readerPointer //should be synchronized
def hasMessages: Boolean = readerPointer == writerPointer //should be synchronized
def cleanUp(owner: ActorRef, deadLetters: MessageQueue) { }
}
trait MyUnboundedMessageQueueSemantics
}
class MyUnboundedMailbox(settings: ActorSystem.Settings, config: Config) extends MailboxType
with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] {
import MyUnboundedMailbox._
final override def create(owner: Option[ActorRef],
system: Option[ActorSystem]): MessageQueue = {
val pointer = ring.newPointer
val read = pointer.copy
val write = pointer.copy
new MyMessageQueue(pointer, read, write)
}
// you may use another strategy here based on owner (you can access name and path here),
// so for example may allocate same pointers for same prefixes in the name or path
}
変更されていない MyMessageQueue.startPointer を使用して、障害回復中にメッセージ ログにアクセスできます (類似性については、akka のイベント ソーシングを参照することもできます)。
リングが「終了」すると、非常に古い未配信メッセージが新しいバージョンで上書きされる可能性があるため、 UnboundedQueue アプローチを使用しても、ここでのメッセージ配信は保証されません。