0

反応ストリーム サブスクライバーを akka ソースにアタッチしようとしています。

私のソースは単純なシンク (foreach など) で問題なく動作するようですが、サブスクライバーから作成された実際のシンクを挿入すると、何も得られません。

私のコンテキストは次のとおりです。

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import org.reactivestreams.{Subscriber, Subscription}

implicit val system =  ActorSystem.create("test")
implicit val materializer = ActorMaterializer.create(system)

class PrintSubscriber extends Subscriber[String] {
  override def onError(t: Throwable): Unit = {}
  override def onSubscribe(s: Subscription): Unit = {}
  override def onComplete(): Unit = {}
  override def onNext(t: String): Unit = {
    println(t)
  }
}

私のテストケースは次のとおりです。

val subscriber = new PrintSubscriber()
val sink = Sink.fromSubscriber(subscriber)

val source2 = Source.fromIterator(() => Iterator("aaa", "bbb", "ccc"))
val source1 = Source.fromIterator(() => Iterator("xxx", "yyy", "zzz"))
source1.to(sink).run()(materializer)
source2.runForeach(println)

出力が得られます:

aaa
bbb
ccc

xxx、yyy、zzz が表示されないのはなぜですか?

4

1 に答える 1