3

私は Akka ストリームに慣れていて、次のようにフィボナッチ パブリッシャー/サブスクライバーの例を作成しました。しかし、需要が最初にどのように生成され、それが加入者の要求戦略とどのような関係にあるのかはまだよくわかっていません。誰か説明してくれませんか?

フィボナッチ出版社:

class FibonacciPublisher extends ActorPublisher[Long] with ActorLogging {
  private val queue = Queue[Long](0, 1)

  def receive = {
    case Request(_) => // _ is the demand
      log.debug("Received request; demand = {}.", totalDemand)
      publish
    case Cancel =>
      log.info("Stopping.")
      context.stop(self)
    case unknown => log.warning("Received unknown event: {}.", unknown)
  }

  final def publish = {
    while (isActive && totalDemand > 0) {
      val next = queue.head
      queue += (queue.dequeue + queue.head)

      log.debug("Producing fibonacci number: {}.", next)

      onNext(next)

      if (next > 5000) self ! Cancel
    }
  }
}

フィボナッチ購読者:

class FibonacciSubscriber extends ActorSubscriber with ActorLogging {
  val requestStrategy = WatermarkRequestStrategy(20)

  def receive = {
    case OnNext(fib: Long) =>
      log.debug("Received Fibonacci number: {}", fib)

      if (fib > 5000) self ! OnComplete
    case OnError(ex: Exception) =>
      log.error(ex, ex.getMessage)
      self ! OnComplete
    case OnComplete =>
      log.info("Fibonacci stream completed.")
      context.stop(self)
    case unknown => log.warning("Received unknown event: {}.", unknown)
  }
}

フィボナッチ アプリ:

val src = Source.actorPublisher(Props[FibonacciPublisher])
val flow = Flow[Long].map { _ * 2 }
val sink = Sink.actorSubscriber(Props[FibonacciSubscriber])

src.via(flow).runWith(sink)

サンプル実行:質問: 4 の最初の需要はどこから来たのですか?

2015-10-03 23:10:49.120 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Received request; demand = 4.
2015-10-03 23:10:49.120 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 0.
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 1.
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 1.
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 2.
2015-10-03 23:10:49.122 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 0
2015-10-03 23:10:49.122 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 2
2015-10-03 23:10:49.123 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Received request; demand = 2.
2015-10-03 23:10:49.123 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 2
2015-10-03 23:10:49.124 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 4
2015-10-03 23:10:49.124 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 3.
2015-10-03 23:10:49.125 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 5.
4

1 に答える 1