2

私はまだ Akka ストリームの概念を把握しており、アトミックな方法で処理する必要があるアイテムのコレクションがある場合に、それらをシナリオにマッピングする方法を理解しようとしています。複数のアイテムで構成される注文書があり、各アイテムに何らかの処理を適用してから、単一の値にマージする必要があるとします。そのようなワークフローは、発注書が完全に処理されると閉じられる独自の別個のストリーム (またはサブストリーム) になる必要がありますか? つまり、各注文書は新しいストリームを開始しますか? それとも、終わりのない一連の注文書がありますか? しかし、そうであれば、異なる注文からの購入注文が混在するという問題はありませんか?

言い換えれば、私が達成しようとしているのは、さまざまなワークフローの分離を処理することであり、Akka ストリームがそれに適しているかどうか疑問に思っています。

4

1 に答える 1

2

質問に直接答えると、「各アイテムに何らかの処理を適用してから、それらを単一の値にマージする」ストリームを作成することができます。

サンプルコードを使用して例を開発する:

case class Item(itemId : String)

case class PurchaseOrder(orderId : String, items : Seq[Item])

val purchaseOrder : PurschaseOrder = ???

アイテムをストリームで処理したい場合は可能ですが、質問では削減の正確な性質があいまいだったので、折り畳みがどのように達成されるかを定義しません。

type ProcessOutput = ???

def processItem(item : Item) : ProcessOutput = ???

val combinedResult : Future[CombinedResult] = 
  Source.fromIterator( purchaseOrder.items.toIterator )
        .via(Flow[Item] map processItem)
        .to(Sink.fold[ProcessOutput](???)(???) )
        .run()

あなたの質問に間接的に答えると、

先物を第一に考える

Akka ストリームは、背圧が必要な場合に非常に便利です。バック プレッシャーは、外部データ ソースに接続する場合に一般的です。これは、bp を使用するとアプリケーションがデータのストリーミング速度を判断できるためです。これは、より多くのデータの要求を継続的に通知する必要があるためです。

質問で提示したケースでは、需要をブロードキャストする必要はなく、そのような通信に必要な固有のオーバーヘッドが発生します。あなたはすでにアイテムのコレクションを持っているので、要求を送る相手がいません...

代わりに、先物はあなたが説明したケースに最適な方法だと思います:

def futProcess(item : Item)(implicit ec : ExecutionContext) = 
  Future { processItem(item) } 

// same output type as the stream run 
val combinedResults : Future[CombinedResult] = 
  Future.sequence{ purchaseOrder.items map futProcess }
        .map{ _ fold[ProcessOutput](???)(???) }

完全なActorSystemを使用することで、パフォーマンスが向上し、複雑さが軽減され、ストリームとまったく同じ結果が得られます...

于 2016-11-13T00:40:51.707 に答える