ストリームの次のパスがあります-
kafkaStream[message] ->
kafkaStream[message] -> mergedKafkaStream[message] -> stream[EnrichedMessage] -> I/O
kafkaStream[message] ->
これを akka ストリームの方法で記述する方法がわかりません。フォローしてみました(疑似)。
KafkaStream extends ActorPublisher[message] {
}
IOHandler extends ActorSubscriber {
}
k1、k2、k3 は kafka ストリーム パブリッシャーです。
f = Flow[message].map(_.enrichMessage)
FlowGraph { b =>
k1 ~> merge
k2 ~> merge
k3 ~> merge
merge ~> f ~> ioHandlerSink
}
これがパブリッシャをシンクに接続する方法です。しかし、ここで解決したい問題は遅い IO です。IOHandler アクターはメッセージの処理が非常に遅いため、複数の IOHandler を使用してタスクを分散するにはどうすればよいでしょうか。また、バックプレッシャーを維持したいので、火を使わず、ルーターを使用することを忘れています。
私はakkaストリームに非常に慣れていないので、方法を提案してください。
ありがとう