ここでの問題は、FlowShape が必要なことです。
addEdge を使用するには 2 つの方法があります。
def addEdge[A, B, M2](from: Outlet[A], via: Graph[FlowShape[A, B], M2], to: Inlet[B]): Unit
と
def addEdge[T](from: Outlet[T], to: Inlet[T]): Unit
ビルダーを使用してやりたいことを行うには、2 つの FlowShape を作成し、 を使用しfrom: Outlet[T], to: Inlet[T]
てそれらを接続します。
FlowGraph.closed() { builder: FlowGraph.Builder[Unit] =>
val in = Source(1 to 10)
val out = Sink.foreach(println)
// val f1: Flow[Int, Int, Unit] = Flow[Int].map(_ + 1)
// val f2: Flow[Int, Int, Unit] = Flow[Int].map(_ + 2)
val f1: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 1))
val f2: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 2))
builder.addEdge(builder.add(in), f1.inlet) //Source to f1 in
builder.addEdge(f1.outlet, f2.inlet) // f1 out to f2 in
builder.addEdge(f2.outlet, builder.add(out)) // f2 out to sink
}.run()
違いがわかるように種類を残しました。
それを行う 2 番目のオプションは、部分グラフを使用して FlowShape を作成することです。
val partialFlow: Graph[FlowShape[Int, Int], Unit] = FlowGraph.partial() { builder =>
val f1 = builder.add(Flow[Int].map(_ + 1))
val f2 = builder.add(Flow[Int].map(_ + 2))
builder.addEdge(f1.outlet, f2.inlet)
FlowShape(f1.inlet, f2.outlet)
}
FlowGraph.closed() { builder: FlowGraph.Builder[Unit] =>
val in = Source(1 to 10)
val out = Sink.foreach(println)
builder.addEdge(builder.add(in), partialFlow, builder.add(out))
}.run()