0

次の単純なグラフがあるとします。

class KafkaSource[A](kI: KafkaIterator) extends GraphStage[SourceShape[A]] {

  val out = Outlet[A]("KafkaSource.out")

  override val shape = SourceShape.of(out)

  override def createLogic(attr: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          push(out, kI.next)
        }
      })
    }
}

val g = GraphDSL.create(){ implicit b =>
  val source = b.add(new KafkaSource[Message](itr))
  val sink = b.add(Sink.foreach[Message](println))

  source ~> sink
  ClosedShape
}

私たちはそれを次のように実行しています

RunnableGraph.fromGraph(g).run()

次の使用可能な要素をプッシュする代わりに、kafkaSource に停止 (または人為的に完了) するように通知して、下流の接続されたステージも停止するようにしたいと考えています。

どうすればそれを達成できますか?

シナリオは、カフカに何百万ものメッセージがあり、毎日午後 9 時にメッセージの処理を停止したいと考えており、クリーン シャットダウンで実行中のアプリケーションを停止していると仮定します。

4

1 に答える 1