61

Source次のように、後でプッシュ要素を作成したいと思います。

val src = ... // create the Source here
// and then, do something like this
pushElement(x1, src)
pushElement(x2, src)

これを行うための推奨される方法は何ですか?

ありがとう!

4

3 に答える 3

103

これを実現するには、次の 3 つの方法があります。

1. SourceQueue を使用してマテリアライゼーションをポストする

Source.queueフローを次のように具体化するために使用できますSourceQueue

case class Weather(zipCode : String, temperature : Double, raining : Boolean)

val bufferSize = 100

//if the buffer fills up then this strategy drops the oldest elements
//upon the arrival of a new element.
val overflowStrategy = akka.stream.OverflowStrategy.dropHead

val queue = Source.queue(bufferSize, overflowStrategy)
                  .filter(!_.raining)
                  .to(Sink foreach println)
                  .run() // in order to "keep" the queue Materialized value instead of the Sink's

queue offer Weather("02139", 32.0, true)

2.アクターによるマテリアライゼーションの投稿

同様の質問と回答がここにあります。要点は、ストリームを ActorRef として実体化し、その ref にメッセージを送信することです。

val ref = Source.actorRef[Weather](Int.MaxValue, fail)
                .filter(!_.raining)
                .to(Sink foreach println )
                .run() // in order to "keep" the ref Materialized value instead of the Sink's

ref ! Weather("02139", 32.0, true)

3.アクターによるプレマテリアライゼーション

同様に、メッセージ バッファーを含むアクターを明示的に作成し、そのアクターを使用してソースを作成し、回答の説明に従ってそのアクター メッセージを送信できます

object WeatherForwarder {
  def props : Props = Props[WeatherForwarder]
}

//see provided link for example definition
class WeatherForwarder extends Actor {...}

val actorRef = actorSystem actorOf WeatherForwarder.props 

//note the stream has not been instatiated yet
actorRef ! Weather("02139", 32.0, true) 

//stream already has 1 Weather value to process which is sitting in the 
//ActorRef's internal buffer
val stream = Source(ActorPublisher[Weather](actorRef)).runWith{...}
于 2015-10-29T13:11:59.757 に答える