私は Akka ストリームに慣れていて、次のようにフィボナッチ パブリッシャー/サブスクライバーの例を作成しました。しかし、需要が最初にどのように生成され、それが加入者の要求戦略とどのような関係にあるのかはまだよくわかっていません。誰か説明してくれませんか?
フィボナッチ出版社:
class FibonacciPublisher extends ActorPublisher[Long] with ActorLogging {
private val queue = Queue[Long](0, 1)
def receive = {
case Request(_) => // _ is the demand
log.debug("Received request; demand = {}.", totalDemand)
publish
case Cancel =>
log.info("Stopping.")
context.stop(self)
case unknown => log.warning("Received unknown event: {}.", unknown)
}
final def publish = {
while (isActive && totalDemand > 0) {
val next = queue.head
queue += (queue.dequeue + queue.head)
log.debug("Producing fibonacci number: {}.", next)
onNext(next)
if (next > 5000) self ! Cancel
}
}
}
フィボナッチ購読者:
class FibonacciSubscriber extends ActorSubscriber with ActorLogging {
val requestStrategy = WatermarkRequestStrategy(20)
def receive = {
case OnNext(fib: Long) =>
log.debug("Received Fibonacci number: {}", fib)
if (fib > 5000) self ! OnComplete
case OnError(ex: Exception) =>
log.error(ex, ex.getMessage)
self ! OnComplete
case OnComplete =>
log.info("Fibonacci stream completed.")
context.stop(self)
case unknown => log.warning("Received unknown event: {}.", unknown)
}
}
フィボナッチ アプリ:
val src = Source.actorPublisher(Props[FibonacciPublisher])
val flow = Flow[Long].map { _ * 2 }
val sink = Sink.actorSubscriber(Props[FibonacciSubscriber])
src.via(flow).runWith(sink)
サンプル実行:質問: 4 の最初の需要はどこから来たのですか?
2015-10-03 23:10:49.120 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Received request; demand = 4.
2015-10-03 23:10:49.120 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 0.
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 1.
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 1.
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 2.
2015-10-03 23:10:49.122 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 0
2015-10-03 23:10:49.122 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 2
2015-10-03 23:10:49.123 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Received request; demand = 2.
2015-10-03 23:10:49.123 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 2
2015-10-03 23:10:49.124 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 4
2015-10-03 23:10:49.124 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 3.
2015-10-03 23:10:49.125 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 5.