12

私は実験的な Akka Streams API を少しいじっていましたが、実装方法を確認したいユースケースがあります。私のユースケースでは、接続の入力ストリームをサーバーソケットにバインドすることから供給されるStreamTcpベースがあります。Flow私が持っているフローは、ByteString入ってくるデータに基づいています。入ってくるデータには区切り文字が含まれます。つまり、区切り文字の前のすべてを 1 つのメッセージとして扱い、次の区切り文字までのすべてを次のメッセージとして扱う必要があります。ソケットを使用せず、静的テキストのみを使用して、より単純な例を試してみると、次のようになりました。

import akka.actor.ActorSystem
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.Flow
import scala.util.{ Failure, Success }
import akka.util.ByteString

object BasicTransformation {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("Sys")

    val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")

    Flow(data).
      splitWhen(c => c == '.').
      foreach{producer => 
        Flow(producer).
          filter(c => c != '.').
          fold(new StringBuilder)((sb, c) => sb.append(c.toChar)).
          map(_.toString).
          filter(!_.isEmpty).
          foreach(println(_)).
          consume(FlowMaterializer(MaterializerSettings()))
      }.
      onComplete(FlowMaterializer(MaterializerSettings())) {
        case any =>
          system.shutdown
      }
  }
}

Flow目標を達成するために私が見つけた の主な機能は でした。これは、区切り文字splitWhenごとにメッセージごとに 1 つずつ、追加のサブフローを生成します。.次に、各サブフローを別のステップ パイプラインで処理し、最後に個々のメッセージを出力します。

これは、かなり単純で一般的な使用例であると私が考えていたことを実現するには、少し冗長に思えます。だから私の質問は、これを行うためのよりクリーンで冗長でない方法はありますか、それともストリームを区切り文字で分割するための正しい方法であり、好ましい方法ですか?

4

4 に答える 4

2

アンドレイの使用がFramingあなたの質問に対する最良の解決策だと思いますが、同様の問題があり、Framing制限が多すぎることがわかりました。代わりに、好きなルールを使用statefulMapConcatして入力 ByteString をグループ化できるようにしました。誰かに役立つ場合の質問のコードは次のとおりです。

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString

object BasicTransformation extends App {

  implicit val system = ActorSystem("Sys")
  implicit val materializer = ActorMaterializer()
  implicit val dispatcher = system.dispatcher
  val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")

  val grouping = Flow[Byte].statefulMapConcat { () =>
    var bytes = ByteString()
    byt =>
      if (byt == '.') {
        val string = bytes.utf8String
        bytes = ByteString()
        List(string)
      } else {
        bytes :+= byt
        Nil
      }
  }

  Source(data).via(grouping).runForeach(println).onComplete(_ => system.terminate())
}

生成するもの: Lorem Ipsum is simply Dummy text of the printing And typesetting industry

于 2016-11-02T09:49:55.983 に答える