私は Akka-Streams で遊んでいてFlow
、独自の を実装してカスタムを作ろうとしていPushPullStage
ます。Flow
アップストリームから受信したオブジェクトをリストに蓄積し、アップストリームが完了したときにグループをダウンストリームに放出する前に、関数に従ってそれらをグループ化する必要があります。
実装するのは非常に簡単なことのように思えますが、その方法がわかりません! から複数のオブジェクトを放出する方法はないようですPushPullStage
。
これまでの私の実装は次のとおりです。
class Accumulate[A] extends PushPullStage[A, List[A]] {
private var groups: List[List[A]] = Nil
private def group(x: A): List[List[A]] = ...
override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
groups = group(elem)
ctx.pull()
}
override def onPull(ctx: Context[A]): SyncDirective =
if (ctx.isFinishing) {
for(group <- groups)
ctx.push(group) // this doesn't work
ctx.finish()
} else {
ctx.pull()
}
override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
ctx.absorbTermination()
}
}
編集
圧力を考慮してコードを変更しましたが、現在はすべて機能しています。基本的には、ダウンストリームFlow
が意図したとおりに動作し、要素をプルし続ける必要がありました。
class Accumulate[A] extends PushPullStage[A, List[A]] {
private var groups: List[List[A]] = Nil
private def group(x: A): List[List[A]] = ...
override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
groups = group(elem)
ctx.pull()
}
override def onPull(ctx: Context[A]): SyncDirective =
if (ctx.isFinishing) {
groups match {
case Nil => ctx.finish()
case head :: tail =>
groups = tail
ctx.push(head)
}
} else {
ctx.pull()
}
override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
ctx.absorbTermination()
}
}