1

SinkSourceとを提供する をSink探していますSource。要素がそれに流れ込む場合Sink、対応する で提供する必要がありますSource。次のコードは、私が何を意味するかを示しています。

object SinkSource {
  def apply[T] = new {
    def sink: Sink[T] = ???
    def source: Source[T] = ???
  }
}
val flowgraph = FlowGraph { implicit fgb =>
  import FlowGraphImplicits._
  val sinksource = SinkSource[Int]
  Source(1 to 5) ~> sinksource.sink
                    sinksource.source ~> Sink.foreach(print)
}
implicit val actorSystem = ActorSystem(name = "System")
implicit val flowMaterializer = FlowMaterializer()
val materializedMap = flowgraph.run()

実行すると、次のように表示されます: 12345
So, does a SinkSourceexist (haven't seen it in the API) or does someone know how to implement it? Sinkandへの別個のアクセスが必要であることを言及する必要があるSourceためFlow、これはこの特定の形式では解決策ではありません。

Source(1 to 5) ~> Flow[Int] ~> Sink.foreach(println)
4

1 に答える 1

1

よくあることですが、質問が既に出されている場合は、アイデアが頭に浮かびSinkます。 だからここに行く:SourceJunctionInPortJunctionOutPort

object SinkSource {
  def apply[T](implicit fgb: FlowGraphBuilder) = new SinkSource[T]
}
class SinkSource[T](implicit fgb: FlowGraphBuilder) {
  import FlowGraphImplicits._
  private val merge = Merge[T]
  private val bcast = Broadcast[T]
  Source.empty ~> merge
  merge ~> bcast
  bcast ~> Sink.ignore
  def in: JunctionInPort[T] = merge
  def out: JunctionOutPort[T] = bcast
}
val flowgraph = FlowGraph { implicit fgb =>
  import FlowGraphImplicits._
  val source = Source(1 to 5)
  val sink = Sink.foreach(println)
  val sinkSource = SinkSource[Int]
  source ~> sinkSource.in
            sinkSource.out ~> sink
}
implicit val actorSystem = ActorSystem(name = "System")
implicit val flowMaterializer = FlowMaterializer()
val materializedMap = flowgraph.run()
于 2015-01-27T17:10:49.960 に答える