1

ストリームの次のパスがあります-

 kafkaStream[message] -> 
 kafkaStream[message] -> mergedKafkaStream[message] -> stream[EnrichedMessage] -> I/O
 kafkaStream[message] -> 

これを akka ストリームの方法で記述する方法がわかりません。フォローしてみました(疑似)。

KafkaStream extends ActorPublisher[message] {

}

IOHandler extends ActorSubscriber {

}

k1、k2、k3 は kafka ストリーム パブリッシャーです。

f = Flow[message].map(_.enrichMessage)

FlowGraph { b =>
  k1 ~> merge
  k2 ~> merge
  k3 ~> merge
  merge ~> f ~> ioHandlerSink
}

これがパブリッシャをシンクに接続する方法です。しかし、ここで解決したい問題は遅い IO です。IOHandler アクターはメッセージの処理が非常に遅いため、複数の IOHandler を使用してタスクを分散するにはどうすればよいでしょうか。また、バックプレッシャーを維持したいので、火を使わず、ルーターを使用することを忘れています。

私はakkaストリームに非常に慣れていないので、方法を提案してください。

ありがとう

4

1 に答える 1