0

私は Akka-Streams で遊んでいてFlow、独自の を実装してカスタムを作ろうとしていPushPullStageます。Flowアップストリームから受信したオブジェクトをリストに蓄積し、アップストリームが完了したときにグループをダウンストリームに放出する前に、関数に従ってそれらをグループ化する必要があります。

実装するのは非常に簡単なことのように思えますが、その方法がわかりません! から複数のオブジェクトを放出する方法はないようですPushPullStage

これまでの私の実装は次のとおりです。

class Accumulate[A] extends PushPullStage[A, List[A]] {
    private var groups: List[List[A]] = Nil

    private def group(x: A): List[List[A]] = ...

    override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
      groups = group(elem)
      ctx.pull()
    }

    override def onPull(ctx: Context[A]): SyncDirective =
      if (ctx.isFinishing) {
        for(group <- groups)
          ctx.push(group)    // this doesn't work

        ctx.finish()
      } else {
        ctx.pull()
      }

    override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
      ctx.absorbTermination()
  }
}

編集

圧力を考慮してコードを変更しましたが、現在はすべて機能しています。基本的には、ダウンストリームFlowが意図したとおりに動作し、要素をプルし続ける必要がありました。

class Accumulate[A] extends PushPullStage[A, List[A]] {
    private var groups: List[List[A]] = Nil

    private def group(x: A): List[List[A]] = ...

    override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
      groups = group(elem)
      ctx.pull()
    }

    override def onPull(ctx: Context[A]): SyncDirective =
      if (ctx.isFinishing) {
        groups match {
          case Nil => ctx.finish()

          case head :: tail =>
            groups = tail
            ctx.push(head)
        }
      } else {
        ctx.pull()
      }

    override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
      ctx.absorbTermination()
  }
}
4

1 に答える 1

2

バック プレッシャーに違反するため、要求された以上にプッシュすることはできません。また、注目に値するのは、これが大規模または無制限のストリームの OutOfMemoryError で爆発するため、あなたがしようとしていることをお勧めしないことです。

class Accumulate[A] extends PushPullStage[A, List[A]] {
    private var groups: List[List[A]] = Nil

    private def group(x: A): List[List[A]] = ...

    override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
      groups = group(elem)
      ctx.pull()
    }

    override def onPull(ctx: Context[A]): SyncDirective =
      if (ctx.isFinishing) {
        groups match {
          case Nil => ctx.finish()
          case group :: rest =>
            groups = rest
            ctx.push(group)
        }
      } else {
        ctx.pull()
      }

    override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
      ctx.absorbTermination()
  }
}
于 2015-09-19T21:25:16.033 に答える