メソッドのように機能し、要素を発行するカスタムStatefulStageを作成したいのですが、インスタンスを作成して着信要素をプッシュする方法がわかりません。スタブは次のとおりです。groupBy
Source[A, Unit]
Source[A, Unit]
class GroupBy[A, Mat]() extends StatefulStage[A, Source[A, Unit]] {
override def initial: StageState[A, Source[A, Unit]] = new StageState[A, Source[A, Unit]] {
override def onPush(elem: A, ctx: Context[Source[A, Unit]]): SyncDirective = {
val source: Source[A, Unit] = ... // Need to create source here
// and push `elem` to `source` here
emit(List(source).iterator, ctx)
}
}
}
次のスニペットを GroupBy フローのテストに使用できます (作成されたストリームからイベントを出力する必要があります)。
case class Tick()
case class Event(timestamp: Long, sessionUid: String, traffic: Int)
implicit val system = ActorSystem()
import system.dispatcher
implicit val materializer = ActorMaterializer()
var rnd = Random
rnd.setSeed(1)
val eventsSource = Source
.tick(FiniteDuration(0, SECONDS), FiniteDuration(1, SECONDS), () => Tick)
.map {
case _ => Event(System.currentTimeMillis / 1000, s"session-${rnd.nextInt(5)}", rnd.nextInt(10) * 10)
}
val flow = Flow[Event]
.transform(() => new GroupByUntil)
.map {
(source) => source.runForeach(println)
}
eventsSource
.via(flow)
.runWith(Sink.ignore)
.onComplete(_ => system.shutdown())
誰も私にそれを行う方法を説明できますか?
アップデート:
この回答に基づいて次のonPush
メソッドを作成しましたが、イベントは出力されませんでした。私が理解しているように、フローの一部として実行されている場合にのみ要素をソースにプッシュできますが、テスト スニペットの外でフローを実行したいと考えています。この例のようにフローを実行すると、イベントが処理されて に送信されます。これが、テスト スニペットがイベントを出力しなかった理由だと思います。GroupBy
GroupBy
Sink.ignore
override def onPush(elem: A, ctx: Context[Source[A, Unit]]): SyncDirective = {
val source: Source[A, ActorRef] = Source.actorRef[A](1000, OverflowStrategy.fail)
val flow = Flow[A].to(Sink.ignore).runWith(source)
flow ! elem
emit(List(source.asInstanceOf[Source[A, Unit]]).iterator, ctx)
}
それで、それを修正する方法は?