私は実験的な Akka Streams API を少しいじっていましたが、実装方法を確認したいユースケースがあります。私のユースケースでは、接続の入力ストリームをサーバーソケットにバインドすることから供給されるStreamTcp
ベースがあります。Flow
私が持っているフローは、ByteString
入ってくるデータに基づいています。入ってくるデータには区切り文字が含まれます。つまり、区切り文字の前のすべてを 1 つのメッセージとして扱い、次の区切り文字までのすべてを次のメッセージとして扱う必要があります。ソケットを使用せず、静的テキストのみを使用して、より単純な例を試してみると、次のようになりました。
import akka.actor.ActorSystem
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.Flow
import scala.util.{ Failure, Success }
import akka.util.ByteString
object BasicTransformation {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("Sys")
val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")
Flow(data).
splitWhen(c => c == '.').
foreach{producer =>
Flow(producer).
filter(c => c != '.').
fold(new StringBuilder)((sb, c) => sb.append(c.toChar)).
map(_.toString).
filter(!_.isEmpty).
foreach(println(_)).
consume(FlowMaterializer(MaterializerSettings()))
}.
onComplete(FlowMaterializer(MaterializerSettings())) {
case any =>
system.shutdown
}
}
}
Flow
目標を達成するために私が見つけた の主な機能は でした。これは、区切り文字splitWhen
ごとにメッセージごとに 1 つずつ、追加のサブフローを生成します。.
次に、各サブフローを別のステップ パイプラインで処理し、最後に個々のメッセージを出力します。
これは、かなり単純で一般的な使用例であると私が考えていたことを実現するには、少し冗長に思えます。だから私の質問は、これを行うためのよりクリーンで冗長でない方法はありますか、それともストリームを区切り文字で分割するための正しい方法であり、好ましい方法ですか?