0

入力要素が出力に渡され、(フィルターを使用して) すべてをドロップするフィードバック ループに送り返される小さなサンプル グラフを作成しました。

フィードバック ブランチはすべてをドロップするため、アイデンティティ Flow[T] と同じ動作が得られることを期待しています。

代わりに、入力要素は期待どおりに発行されますが、実体化は​​完了しません。

私は何か間違ったことをしていますか?これは起こるはずですか?入力ストリームが完了すると、ブロードキャストのフィードバック出力が完了してはいけませんか?

問題は、ここで説明されている鶏と卵のシナリオに似ていると思いますか?

akka-stream-experimental 2.0.3 を使用しています

乾杯

object Test extends App {
  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()

  val flow = Flow.fromGraph(GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._

    val dropEverything = b.add(Flow[Int].filter(_ => false))
    val input = b.add(Merge[Int](2))
    val bcast = b.add(Broadcast[Int](2))

    input                         ~> bcast
    input.in(1) <~ dropEverything <~ bcast.out(1)

    FlowShape(input.in(0), bcast.out(0))
  })

  val result = Source.single(42).via(flow).runWith(Sink.foreach(println))

  try {
    // prints 42 but the stream doesn't terminate and the await timeouts
    println(Await.result(result, 5.seconds))
  } finally {
    system.terminate()
  }
}
4

1 に答える 1

1

これはここで回答されています。とが互いに完了するのを待っているためMerge、サイクルは決して完了しません。Broadcast

これを に変更して、これval input = b.add(Merge[Int](2, eagerComplete = true))を防ぐことができます。

または、入力からプロセスまでの要素の数を試すこともできます。この場合ですval dropEverything = b.add(Flow[Int].take(1).filter(_ => false))n1

于 2016-09-20T17:51:37.713 に答える