3

以下のコード スニペットが実行している理由を理解しようとしています。シンクはソースがコンテンツを生成するよりも速く要求を生成できないため、一部のオファー (オーバーフロー戦略が [ドロップ バッファー] に設定されている) に応答してメッセージがドロップされ、エラーとキューが閉じられたというメッセージが表示されると考えていました。自爆ピースの後。

スニペット:

package playground

import java.time.LocalDateTime
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.stream.QueueOfferResult.{Dropped, Enqueued, Failure, QueueClosed}
import akka.stream._
import akka.stream.scaladsl.{Sink, Source}

import scala.concurrent.duration._

case object MessageToSink

object Playground extends App {

  implicit val actorSystem = ActorSystem("Playground")
  implicit val execCntxt = actorSystem.dispatcher

  val sinkActor = actorSystem.actorOf(Props[Actor2SinkFwder])
  actorSystem.scheduler.schedule(1 millisecond, 50 milliseconds, sinkActor, MessageToSink)

  println(s"Playground has started... ${LocalDateTime.now()}")
}

class Actor2SinkFwder extends Actor with ActorLogging {

  implicit val materializer = ActorMaterializer()
  implicit val execCtxt = context.dispatcher

  val flow = Source.queue[Int](bufferSize = 1, overflowStrategy = OverflowStrategy.dropBuffer)
    .to(Sink.foreach[Int] {
      i =>
        println(s"$i Sinking starts at ${LocalDateTime.now()}")
        Thread.sleep(150)
        if (i == 5) throw new RuntimeException("KaBoom!")
        println(s"$i Sinking completes at ${LocalDateTime.now()}")
    }).run()

  val i: AtomicInteger = new AtomicInteger(0)

  override def receive: Receive = {
    case MessageToSink =>
      val num = i.incrementAndGet()
      println(s"$num Sink Command received at ${LocalDateTime.now()}")
      flow.offer(num).collect {
        case Enqueued => println(s"$num Enqueued ${LocalDateTime.now}")
        case Dropped => println(s"$num Dropped ${LocalDateTime.now}")
        case Failure(err) => println(s"$num Failed ${LocalDateTime.now} $err")
        case QueueClosed => println(s"$num Failed ${LocalDateTime.now} QueueClosed")
      }
   }
}

出力:

Playground has started... 2016-12-27T18:35:29.574
1 Sink Command received at 2016-12-27T18:35:29.640
2 Sink Command received at 2016-12-27T18:35:29.642
3 Sink Command received at 2016-12-27T18:35:29.642
1 Sinking starts at 2016-12-27T18:35:29.649
1 Enqueued 2016-12-27T18:35:29.650
4 Sink Command received at 2016-12-27T18:35:29.688
5 Sink Command received at 2016-12-27T18:35:29.738
6 Sink Command received at 2016-12-27T18:35:29.788
1 Sinking completes at 2016-12-27T18:35:29.799
2 Sinking starts at 2016-12-27T18:35:29.800
2 Enqueued 2016-12-27T18:35:29.800
7 Sink Command received at 2016-12-27T18:35:29.838
8 Sink Command received at 2016-12-27T18:35:29.888
9 Sink Command received at 2016-12-27T18:35:29.938
2 Sinking completes at 2016-12-27T18:35:29.950
3 Sinking starts at 2016-12-27T18:35:29.951
3 Enqueued 2016-12-27T18:35:29.951
10 Sink Command received at 2016-12-27T18:35:29.988
11 Sink Command received at 2016-12-27T18:35:30.038
12 Sink Command received at 2016-12-27T18:35:30.088
3 Sinking completes at 2016-12-27T18:35:30.101
4 Sinking starts at 2016-12-27T18:35:30.101
4 Enqueued 2016-12-27T18:35:30.101
13 Sink Command received at 2016-12-27T18:35:30.138
14 Sink Command received at 2016-12-27T18:35:30.189
15 Sink Command received at 2016-12-27T18:35:30.238
4 Sinking completes at 2016-12-27T18:35:30.251
5 Sinking starts at 2016-12-27T18:35:30.251
5 Enqueued 2016-12-27T18:35:30.252
16 Sink Command received at 2016-12-27T18:35:30.288
17 Sink Command received at 2016-12-27T18:35:30.338
18 Sink Command received at 2016-12-27T18:35:30.388
19 Sink Command received at 2016-12-27T18:35:30.438
20 Sink Command received at 2016-12-27T18:35:30.488
21 Sink Command received at 2016-12-27T18:35:30.538
22 Sink Command received at 2016-12-27T18:35:30.588
23 Sink Command received at 2016-12-27T18:35:30.638
24 Sink Command received at 2016-12-27T18:35:30.688
25 Sink Command received at 2016-12-27T18:35:30.738
26 Sink Command received at 2016-12-27T18:35:30.788
etc...

私の誤解は、QueueSource クラスでのgetAsyncCallbackの使用に関するものだと思います。QueueSource のオファー呼び出しが正しいオファーの詳細で stageLogic を呼び出しても、ステージ ロジック内のこのコードの実際のハンドラーは、前の要素の処理が完了するまで呼び出されないため、バッファー サイズをチェックしたり、オーバーフローを適用したりするためのロジックはありません。戦略が適用されています... :-/

4

2 に答える 2