以下のコード スニペットが実行している理由を理解しようとしています。シンクはソースがコンテンツを生成するよりも速く要求を生成できないため、一部のオファー (オーバーフロー戦略が [ドロップ バッファー] に設定されている) に応答してメッセージがドロップされ、エラーとキューが閉じられたというメッセージが表示されると考えていました。自爆ピースの後。
スニペット:
package playground
import java.time.LocalDateTime
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.stream.QueueOfferResult.{Dropped, Enqueued, Failure, QueueClosed}
import akka.stream._
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.duration._
case object MessageToSink
object Playground extends App {
implicit val actorSystem = ActorSystem("Playground")
implicit val execCntxt = actorSystem.dispatcher
val sinkActor = actorSystem.actorOf(Props[Actor2SinkFwder])
actorSystem.scheduler.schedule(1 millisecond, 50 milliseconds, sinkActor, MessageToSink)
println(s"Playground has started... ${LocalDateTime.now()}")
}
class Actor2SinkFwder extends Actor with ActorLogging {
implicit val materializer = ActorMaterializer()
implicit val execCtxt = context.dispatcher
val flow = Source.queue[Int](bufferSize = 1, overflowStrategy = OverflowStrategy.dropBuffer)
.to(Sink.foreach[Int] {
i =>
println(s"$i Sinking starts at ${LocalDateTime.now()}")
Thread.sleep(150)
if (i == 5) throw new RuntimeException("KaBoom!")
println(s"$i Sinking completes at ${LocalDateTime.now()}")
}).run()
val i: AtomicInteger = new AtomicInteger(0)
override def receive: Receive = {
case MessageToSink =>
val num = i.incrementAndGet()
println(s"$num Sink Command received at ${LocalDateTime.now()}")
flow.offer(num).collect {
case Enqueued => println(s"$num Enqueued ${LocalDateTime.now}")
case Dropped => println(s"$num Dropped ${LocalDateTime.now}")
case Failure(err) => println(s"$num Failed ${LocalDateTime.now} $err")
case QueueClosed => println(s"$num Failed ${LocalDateTime.now} QueueClosed")
}
}
}
出力:
Playground has started... 2016-12-27T18:35:29.574
1 Sink Command received at 2016-12-27T18:35:29.640
2 Sink Command received at 2016-12-27T18:35:29.642
3 Sink Command received at 2016-12-27T18:35:29.642
1 Sinking starts at 2016-12-27T18:35:29.649
1 Enqueued 2016-12-27T18:35:29.650
4 Sink Command received at 2016-12-27T18:35:29.688
5 Sink Command received at 2016-12-27T18:35:29.738
6 Sink Command received at 2016-12-27T18:35:29.788
1 Sinking completes at 2016-12-27T18:35:29.799
2 Sinking starts at 2016-12-27T18:35:29.800
2 Enqueued 2016-12-27T18:35:29.800
7 Sink Command received at 2016-12-27T18:35:29.838
8 Sink Command received at 2016-12-27T18:35:29.888
9 Sink Command received at 2016-12-27T18:35:29.938
2 Sinking completes at 2016-12-27T18:35:29.950
3 Sinking starts at 2016-12-27T18:35:29.951
3 Enqueued 2016-12-27T18:35:29.951
10 Sink Command received at 2016-12-27T18:35:29.988
11 Sink Command received at 2016-12-27T18:35:30.038
12 Sink Command received at 2016-12-27T18:35:30.088
3 Sinking completes at 2016-12-27T18:35:30.101
4 Sinking starts at 2016-12-27T18:35:30.101
4 Enqueued 2016-12-27T18:35:30.101
13 Sink Command received at 2016-12-27T18:35:30.138
14 Sink Command received at 2016-12-27T18:35:30.189
15 Sink Command received at 2016-12-27T18:35:30.238
4 Sinking completes at 2016-12-27T18:35:30.251
5 Sinking starts at 2016-12-27T18:35:30.251
5 Enqueued 2016-12-27T18:35:30.252
16 Sink Command received at 2016-12-27T18:35:30.288
17 Sink Command received at 2016-12-27T18:35:30.338
18 Sink Command received at 2016-12-27T18:35:30.388
19 Sink Command received at 2016-12-27T18:35:30.438
20 Sink Command received at 2016-12-27T18:35:30.488
21 Sink Command received at 2016-12-27T18:35:30.538
22 Sink Command received at 2016-12-27T18:35:30.588
23 Sink Command received at 2016-12-27T18:35:30.638
24 Sink Command received at 2016-12-27T18:35:30.688
25 Sink Command received at 2016-12-27T18:35:30.738
26 Sink Command received at 2016-12-27T18:35:30.788
etc...
私の誤解は、QueueSource クラスでのgetAsyncCallbackの使用に関するものだと思います。QueueSource のオファー呼び出しが正しいオファーの詳細で stageLogic を呼び出しても、ステージ ロジック内のこのコードの実際のハンドラーは、前の要素の処理が完了するまで呼び出されないため、バッファー サイズをチェックしたり、オーバーフローを適用したりするためのロジックはありません。戦略が適用されています... :-/