私の質問はどこかに関連しています: Source.actorRef によって作成された akka ストリーム Source の基になる ActorRef にアクセスする いくつかの違いがあります:
- akka-stream 実験的 1.0 を使用しています
- 私はactorPublisherモデルを使用しています
- 並列処理によるストリーム定義に FlowGraph DSL を使用しています
Source が保持する Actor Publisher インスタンスにメッセージを送信するために、actorRef を取得する方法が見つかりません。
def run(implicit system: ActorSystem) = { import system.dispatcher implicit val materializer = ActorMaterializer() val source = Source.actorPublisher[TestRequest](TestActor.props).map { request => request.event } //Implementation in subpackage val sinkLevel1 = Sinks.sinkLevel1 val sinkLevel2 = Sinks.sinkLevel2 //Implementation in subpackage val stageTriage = FlowStages.stageTriage val stageEvalProcess1 = FlowStages.stageEvalProcess1 val stageEvalProcess2 = FlowStages.stageEvalProcess2 val pipeline = FlowGraph.closed(){ implicit builder => import FlowGraph.Implicits._ val stageDispatchByRuleLevels = builder.add(Broadcast[TriagedSystemEvent](2)) source ~> stageTriage ~> stageDispatchByRuleLevels stageDispatchByRuleLevels ~> stageEvalProcess1 ~> sinkLevel1 stageDispatchByRuleLevels ~> stageEvalProcess2 ~> sinkLevel2 } pipeline.run() }
手伝ってくれてありがとう !
オリバー