1

私の質問はどこかに関連しています: Source.actorRef によって作成された akka ストリーム Source の基になる ActorRef にアクセスする いくつかの違いがあります:

  1. akka-stream 実験的 1.0 を使用しています
  2. 私はactorPublisherモデルを使用しています
  3. 並列処理によるストリーム定義に FlowGraph DSL を使用しています

Source が保持する Actor Publisher インスタンスにメッセージを送信するために、actorRef を取得する方法が見つかりません。

 def run(implicit system: ActorSystem) = {
   import system.dispatcher
   implicit val materializer = ActorMaterializer()

   val source = Source.actorPublisher[TestRequest](TestActor.props).map { request => request.event }

   //Implementation in subpackage
   val sinkLevel1 = Sinks.sinkLevel1 
   val sinkLevel2 = Sinks.sinkLevel2

   //Implementation in subpackage
   val stageTriage = FlowStages.stageTriage    
   val stageEvalProcess1 = FlowStages.stageEvalProcess1
   val stageEvalProcess2 = FlowStages.stageEvalProcess2

   val pipeline = FlowGraph.closed(){ implicit builder => 
     import FlowGraph.Implicits._

     val stageDispatchByRuleLevels = builder.add(Broadcast[TriagedSystemEvent](2))

     source ~> stageTriage ~> stageDispatchByRuleLevels
                              stageDispatchByRuleLevels ~> stageEvalProcess1 ~> sinkLevel1
                              stageDispatchByRuleLevels ~> stageEvalProcess2 ~> sinkLevel2

   }

   pipeline.run()

 }

手伝ってくれてありがとう !

オリバー

4

1 に答える 1

1

リンクされた質問のノアの回答に基づいて、追加すると

val ref = pipeline.run()

その後、次のように ref にメッセージを送信できます

ref ! ...
于 2015-09-12T17:08:19.733 に答える