次の単純なグラフがあるとします。
class KafkaSource[A](kI: KafkaIterator) extends GraphStage[SourceShape[A]] {
val out = Outlet[A]("KafkaSource.out")
override val shape = SourceShape.of(out)
override def createLogic(attr: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
override def onPull(): Unit = {
push(out, kI.next)
}
})
}
}
val g = GraphDSL.create(){ implicit b =>
val source = b.add(new KafkaSource[Message](itr))
val sink = b.add(Sink.foreach[Message](println))
source ~> sink
ClosedShape
}
私たちはそれを次のように実行しています
RunnableGraph.fromGraph(g).run()
次の使用可能な要素をプッシュする代わりに、kafkaSource に停止 (または人為的に完了) するように通知して、下流の接続されたステージも停止するようにしたいと考えています。
どうすればそれを達成できますか?
シナリオは、カフカに何百万ものメッセージがあり、毎日午後 9 時にメッセージの処理を停止したいと考えており、クリーン シャットダウンで実行中のアプリケーションを停止していると仮定します。