質問に直接答えると、「各アイテムに何らかの処理を適用してから、それらを単一の値にマージする」ストリームを作成することができます。
サンプルコードを使用して例を開発する:
case class Item(itemId : String)
case class PurchaseOrder(orderId : String, items : Seq[Item])
val purchaseOrder : PurschaseOrder = ???
アイテムをストリームで処理したい場合は可能ですが、質問では削減の正確な性質があいまいだったので、折り畳みがどのように達成されるかを定義しません。
type ProcessOutput = ???
def processItem(item : Item) : ProcessOutput = ???
val combinedResult : Future[CombinedResult] =
Source.fromIterator( purchaseOrder.items.toIterator )
.via(Flow[Item] map processItem)
.to(Sink.fold[ProcessOutput](???)(???) )
.run()
あなたの質問に間接的に答えると、
先物を第一に考える
Akka ストリームは、背圧が必要な場合に非常に便利です。バック プレッシャーは、外部データ ソースに接続する場合に一般的です。これは、bp を使用するとアプリケーションがデータのストリーミング速度を判断できるためです。これは、より多くのデータの要求を継続的に通知する必要があるためです。
質問で提示したケースでは、需要をブロードキャストする必要はなく、そのような通信に必要な固有のオーバーヘッドが発生します。あなたはすでにアイテムのコレクションを持っているので、要求を送る相手がいません...
代わりに、先物はあなたが説明したケースに最適な方法だと思います:
def futProcess(item : Item)(implicit ec : ExecutionContext) =
Future { processItem(item) }
// same output type as the stream run
val combinedResults : Future[CombinedResult] =
Future.sequence{ purchaseOrder.items map futProcess }
.map{ _ fold[ProcessOutput](???)(???) }
完全なActorSystemを使用することで、パフォーマンスが向上し、複雑さが軽減され、ストリームとまったく同じ結果が得られます...