2

私は、1 つのソース、1 つのシンク、およびそれらの間に 2 つの「フロー」を持つ単純なフローを構築しようとしています。だから何か

FlowGraph.closed() { builder: FlowGraph.Builder[Unit] =>
  val in = Source(1 to 10)
  val out = Sink.ignore
  val f1 = Flow[Int].map(_ + 1)
  val f2 = Flow[Int].map(_ + 2)
  builder.addEdge(builder.add(in), f1, builder.add(out))

  // builder.addEdge(builder.add(in), f1, f2, builder.add(out)) // does not compile

}.run

コメント行はコンパイルされませんが、私が達成しようとしていることを示しています。

この例は、3 を加算する新しい関数を定義したり、関数を構成したりするのと同じくらい簡単になるように工夫されていますが、実際には関数ははるかに複雑であり、簡単にするために分離されています。

ここでは、ファンアウトやファンインを行うつもりはありません。それらの間に任意の数の関数を含めることができる単純な流れです。

ありがとう。

4

2 に答える 2

2

viaメソッド onは、Flow必要なことを実行する必要があります (つまりf1 via f2)。

scaladocを参照してください。

あなたもできることに注意してください

val f = Flow[Int].
  map(_ + 1).
  map(_ + 2)

別居を続けたい場合。または、関数をg1およびとして抽出するとg2、次のこともできます

val g1 = (i: Int) => i + 1
val g2 = (i: Int) => i + 2
val f = Flow[Int].map(g1 andThen g2)

一般的には、可能な限り関数を操作し、本当に必要な場合に備えてフローを保存することをお勧めします。

于 2015-07-15T06:00:20.430 に答える
1

ここでの問題は、FlowShape が必要なことです。

addEdge を使用するには 2 つの方法があります。

def addEdge[A, B, M2](from: Outlet[A], via: Graph[FlowShape[A, B], M2], to: Inlet[B]): Unit

def addEdge[T](from: Outlet[T], to: Inlet[T]): Unit

ビルダーを使用してやりたいことを行うには、2 つの FlowShape を作成し、 を使用しfrom: Outlet[T], to: Inlet[T]てそれらを接続します。

FlowGraph.closed() { builder: FlowGraph.Builder[Unit] =>
    val in = Source(1 to 10)
    val out = Sink.foreach(println)

//    val f1: Flow[Int, Int, Unit] = Flow[Int].map(_ + 1)
//    val f2: Flow[Int, Int, Unit] = Flow[Int].map(_ + 2)

    val f1: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 1))
    val f2: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 2))
    builder.addEdge(builder.add(in), f1.inlet) //Source to f1 in
    builder.addEdge(f1.outlet, f2.inlet) // f1 out to f2 in
    builder.addEdge(f2.outlet, builder.add(out)) // f2 out to sink
  }.run()

違いがわかるように種類を残しました。

それを行う 2 番目のオプションは、部分グラフを使用して FlowShape を作成することです。

val partialFlow: Graph[FlowShape[Int, Int], Unit] = FlowGraph.partial() { builder =>
    val f1 = builder.add(Flow[Int].map(_ + 1))
    val f2 = builder.add(Flow[Int].map(_ + 2))
    builder.addEdge(f1.outlet, f2.inlet)

    FlowShape(f1.inlet, f2.outlet)
  }

FlowGraph.closed() { builder: FlowGraph.Builder[Unit] =>
    val in = Source(1 to 10)
    val out = Sink.foreach(println)
    builder.addEdge(builder.add(in), partialFlow, builder.add(out))
  }.run()
于 2015-08-07T07:28:07.597 に答える