227

Akka Streams ライブラリには、すでに非常に豊富なドキュメントが付属しています。しかし、私にとっての主な問題は、それがあまりにも多くの資料を提供することです.私は、学ばなければならない概念の数に圧倒されています. そこに示されている多くの例は非常に重く感じられ、実際のユースケースに簡単に変換することができないため、非常に難解です。すべてのビルディング ブロックを一緒に構築する方法や、特定の問題の解決にどのように役立つかを説明せずに、詳細を提供しすぎていると思います。

ソース、シンク、フロー、グラフ ステージ、部分グラフ、マテリアライゼーション、グラフ DSL などがありますが、どこから始めればよいかわかりません。クイック スタート ガイドは出発点となることを意図していますが、理解できません。上記の概念を説明せずに投げ込むだけです。さらに、コード例を実行することはできません - テキストをたどることが多かれ少なかれ不可能になる欠落部分があります。

ソース、シンク、フロー、グラフ ステージ、部分グラフ、マテリアライゼーション、および私が見逃した他のいくつかの概念について、簡単な言葉と、すべての詳細を説明していない簡単な例を使用して説明できますか (いずれにせよ、これらはおそらく必要ありません)。始まり)?

4

1 に答える 1

518

akka-streamこの回答はバージョンに基づいてい2.4.2ます。API は、他のバージョンでは若干異なる場合があります。依存関係はsbtで消費できます:

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2"

よし、始めよう。Akka Streams の API は、主に 3 つのタイプで構成されています。Reactive Streamsとは対照的に、これらの型はより強力であり、したがってより複雑です。すべてのコード例について、次の定義がすでに存在していると想定されています。

import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util._

implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher

import型宣言にはステートメントが必要です。systemAkka のアクター システムをmaterializer表し、ストリームの評価コンテキストを表します。この場合、 を使用しますActorMaterializer。これは、ストリームがアクターの上で評価されることを意味します。どちらの値も としてマークされているimplicitため、Scala コンパイラーはこれら 2 つの依存関係が必要なときにいつでも自動的に注入できるようになります。system.dispatcherの実行コンテキストであるもインポートしFuturesます。

新しい API

Akka ストリームには、次の主要なプロパティがあります。

  • それらはReactive Streams 仕様を実装しており、その 3 つの主要な目標であるバックプレッシャー、非同期および非ブロッキング境界、異なる実装間の相互運用性は、Akka Streams にも完全に適用されます。
  • これらは、 と呼ばれるストリームの評価エンジンの抽象化を提供しますMaterializer
  • Sourceプログラムは、再利用可能なビルディング ブロックとして定式化され、3 つの主なタイプとして表されSinkますFlow。構築ブロックはグラフを形成し、その評価は に基づいており、Materializer明示的にトリガーする必要があります。

以下では、3 つの主要なタイプの使用方法をより深く紹介します。

ソース

ASourceはデータ作成者であり、ストリームへの入力ソースとして機能します。それぞれSourceに 1 つの出力チャネルがあり、入力チャネルはありません。すべてのデータは、出力チャネルを介して に接続されているものに流れますSource

ソース

boldradius.comから取得した画像。

は複数のSource方法で作成できます。

scala> val s = Source.empty
s: akka.stream.scaladsl.Source[Nothing,akka.NotUsed] = ...

scala> val s = Source.single("single element")
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> val s = Source(1 to 3)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val s = Source(Future("single value from a Future"))
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> s runForeach println
res0: scala.concurrent.Future[akka.Done] = ...
single value from a Future

上記のケースでは、 に有限のデータを与えましたSource。つまり、それらは最終的に終了します。Reactive Streams はデフォルトで遅延して非同期であることを忘れてはなりません。これは、ストリームの評価を明示的に要求する必要があることを意味します。Akka Streams では、run*メソッドを使用してこれを行うことができます。これrunForeachは、よく知られているforeach関数と同じrunです。追加により、ストリームの評価を要求することが明示されます。有限のデータは退屈なので、無限のデータを続けます。

scala> val s = Source.repeat(5)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> s take 3 runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
5
5
5

メソッドを使用すると、take無期限に評価することを防ぐ人工的な停止点を作成できます。アクターのサポートが組み込まれているため、アクターに送信されるメッセージをストリームに簡単にフィードすることもできます。

def run(actor: ActorRef) = {
  Future { Thread.sleep(300); actor ! 1 }
  Future { Thread.sleep(200); actor ! 2 }
  Future { Thread.sleep(100); actor ! 3 }
}
val s = Source
  .actorRef[Int](bufferSize = 0, OverflowStrategy.fail)
  .mapMaterializedValue(run)

scala> s runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
3
2
1

Futuresが異なるスレッドで非同期に実行されていることがわかります。これが結果を説明しています。上記の例では、着信要素のバッファは必要ないためOverflowStrategy.fail、ストリームがバッファ オーバーフローで失敗するように構成できます。特にこのアクター インターフェイスを介して、任意のデータ ソースからストリームをフィードできます。データが同じスレッドによって作成されたのか、別のスレッドによって作成されたのか、別のプロセスによって作成されたのか、またはインターネット経由でリモート システムから取得されたのかは問題ではありません。

シンク

ASinkは基本的に a の反対ですSource。これはストリームのエンドポイントであるため、データを消費します。ASinkには 1 つの入力チャネルがあり、出力チャネルはありません。Sinks特に、ストリームを評価せずに再利用可能な方法でデータ コレクターの動作を指定する場合に必要です。既知のrun*メソッドではこれらのプロパティを使用できないため、Sink代わりに使用することをお勧めします。

シンク

boldradius.comから取得した画像。

動作中の短い例Sink:

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](elem => println(s"sink received: $elem"))
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val flow = source to sink
flow: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> flow.run()
res3: akka.NotUsed = NotUsed
sink received: 1
sink received: 2
sink received: 3

Sourceaを a に接続するにSinkは、toメソッドを使用します。いわゆる を返します。これは、メソッドを呼び出すだけで実行できるRunnableFlow特殊な形式の a を後で見ることができます。Flowrun()

実行可能なフロー

boldradius.comから取得した画像。

もちろん、シンクに到着したすべての値をアクターに転送することもできます。

val actor = system.actorOf(Props(new Actor {
  override def receive = {
    case msg => println(s"actor received: $msg")
  }
}))

scala> val sink = Sink.actorRef[Int](actor, onCompleteMessage = "stream completed")
sink: akka.stream.scaladsl.Sink[Int,akka.NotUsed] = ...

scala> val runnable = Source(1 to 3) to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res3: akka.NotUsed = NotUsed
actor received: 1
actor received: 2
actor received: 3
actor received: stream completed

フロー

データ ソースとシンクは、Akka ストリームと既存のシステムの間の接続が必要な場合に最適ですが、実際には何もできません。フローは、Akka Streams の基本抽象化に欠けている最後の要素です。それらは異なるストリーム間のコネクタとして機能し、その要素を変換するために使用できます。

フロー

boldradius.comから取得した画像。

aFlowが a new に接続されている場合、結果はSourcea newSourceです。同様に、 にFlow接続されたSinkは新しい を作成しSinkます。そして、 aと a のFlow両方に接続された a は、 a になります。したがって、それらは入力チャネルと出力チャネルの間に位置しますが、 aまたは aに接続されていない限り、単独ではいずれのフレーバーにも対応しません。SourceSinkRunnableFlowSourceSink

フルストリーム

boldradius.comから取得した画像。

をよりよく理解するためにFlows、いくつかの例を見てみましょう。

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](println)
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val invert = Flow[Int].map(elem => elem * -1)
invert: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val doubler = Flow[Int].map(elem => elem * 2)
doubler: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val runnable = source via invert via doubler to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res10: akka.NotUsed = NotUsed
-2
-4
-6

viaメソッドを介して、Sourceとを接続できFlowます。コンパイラは入力タイプを推測できないため、入力タイプを指定する必要があります。この単純な例ですでにわかるように、フローinvertとフローdoubleはデータのプロデューサーとコンシューマーから完全に独立しています。データを変換して出力チャネルに転送するだけです。これは、複数のストリーム間でフローを再利用できることを意味します。

scala> val s1 = Source(1 to 3) via invert to sink
s1: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> val s2 = Source(-3 to -1) via invert to sink
s2: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> s1.run()
res10: akka.NotUsed = NotUsed
-1
-2
-3

scala> s2.run()
res11: akka.NotUsed = NotUsed
3
2
1

s1およびs2完全に新しいストリームを表します。ビルディング ブロックを介してデータを共有することはありません。

無制限のデータ ストリーム

先に進む前に、Reactive Streams の重要な側面のいくつかを再検討する必要があります。無制限の数の要素が任意の時点に到着し、ストリームをさまざまな状態にすることができます。通常の状態である実行可能なストリームのほかに、ストリームは、エラーによって、またはそれ以上データが到着しないことを示す信号によって停止される場合があります。次の例のように、タイムラインにイベントをマークすることで、ストリームをグラフィカルにモデル化できます。

ストリームが時間順に並べられた一連の進行中のイベントであることを示します

The Introduction to Reactive Programming you've been missingから取得した画像。

前のセクションの例ですでに実行可能なフローを見てきました。RunnableGraphストリームが実際に実体化できるときはいつでもを取得します。つまり、Sinkが に接続されていることを意味しSourceます。これまでのところ、常に valueUnitに実体化されています。これは型で確認できます。

val source: Source[Int, NotUsed] = Source(1 to 3)
val sink: Sink[Int, Future[Done]] = Sink.foreach[Int](println)
val flow: Flow[Int, Int, NotUsed] = Flow[Int].map(x => x)

forSourceSink2 番目の型パラメーターと forFlowの 3 番目の型パラメーターは、具体化された値を示します。この回答全体を通して、具体化の完全な意味は説明されません。ただし、マテリアライゼーションの詳細については、公式ドキュメントを参照してください。現時点で知っておく必要があるのは、実体化された値は、ストリームを実行したときに得られるものだということだけです。これまでは副作用にしか興味がなかったのでUnit、具体化された値として取得しました。これに対する例外は、シンクの具体化であり、Future. それは私たちにFuture、この値は、シンクに接続されているストリームがいつ終了したかを示すことができるためです。これまでのところ、前のコード例は概念を説明するのに適していましたが、有限のストリームまたは非常に単純な無限のストリームしか扱っていなかったため、退屈でもありました。より興味深いものにするために、以下では、完全な非同期で無制限のストリームについて説明します。

クリックストリームの例

例として、クリック イベントをキャプチャするストリームが必要です。さらに難しくするために、短い時間内に次々と発生するクリック イベントをグループ化するとします。このようにして、ダブル、トリプル、または 10 回のクリックを簡単に検出できます。さらに、すべてのシングル クリックを除外したいと考えています。深呼吸して、その問題を命令的な方法で解決する方法を想像してみてください。最初の試行で正しく機能するソリューションを実装できる人はいないでしょう。反応的な方法で、この問題は簡単に解決できます。実際、ソリューションは非常にシンプルで簡単に実装できるため、コードの動作を直接説明する図で表現することもできます。

クリック ストリームの例のロジック

The Introduction to Reactive Programming you've been missingから取得した画像。

灰色のボックスは、あるストリームが別のストリームに変換される方法を説明する関数です。throttle250 ミリ秒以内にクリックを累積する関数を使用すると、関数mapfilter関数は一目瞭然です。色のオーブはイベントを表し、矢印はイベントが関数内をどのように流れるかを示しています。処理ステップの後半では、ストリームを通過する要素がますます少なくなります。これは、要素をグループ化してフィルターで除外するためです。この画像のコードは次のようになります。

val multiClickStream = clickStream
    .throttle(250.millis)
    .map(clickEvents => clickEvents.length)
    .filter(numberOfClicks => numberOfClicks >= 2)

ロジック全体は、わずか 4 行のコードで表すことができます。Scala では、さらに短く書くことができます。

val multiClickStream = clickStream.throttle(250.millis).map(_.length).filter(_ >= 2)

の定義はclickStreamもう少し複雑ですが、クリック イベントのキャプチャが簡単にできない JVM でサンプル プログラムが実行されるため、これは単なるケースです。もう 1 つの問題は、Akka がデフォルトでこのthrottle機能を提供していないことです。代わりに、自分でそれを書かなければなりませんでした。この関数は (mapまたはfilter関数の場合と同様に) さまざまなユース ケースで再利用できるため、これらの行をロジックの実装に必要な行数としてカウントしません。ただし、命令型言語では、ロジックを簡単に再利用できず、さまざまな論理ステップが順次適用されるのではなく、すべて 1 か所で発生するのが普通です。完全なコード例は、要旨であり、ここではこれ以上説明しません。

SimpleWebServer の例

代わりに議論すべきことは、別の例です。クリック ストリームは、Akka Streams が実際の例を処理できるようにするための良い例ですが、実際の並列実行を示す力がありません。次の例は、複数のリクエストを並行して処理できる小さな Web サーバーを表しています。Web サーバーは、着信接続を受け入れ、それらから印刷可能な ASCII 記号を表すバイト シーケンスを受信できる必要があります。これらのバイト シーケンスまたは文字列は、すべての改行文字で小さな部分に分割する必要があります。その後、サーバーは各分割行でクライアントに応答します。別の方法として、行で別の処理を行って特別な応答トークンを提供することもできますが、この例では単純に保ちたいので、派手な機能は導入しません。覚えて、サーバーは同時に複数のリクエストを処理できる必要があります。これは基本的に、リクエストが他のリクエストの実行をブロックすることを許可しないことを意味します。これらの要件をすべて解決することは、命令的な方法では難しい場合がありますが、Akka Streams を使用すると、これらのいずれかを解決するのに数行しか必要ありません。まず、サーバー自体の概要を見てみましょう。

サーバ

基本的に、主要な構成要素は 3 つだけです。最初のものは着信接続を受け入れる必要があります。2 番目は着信要求を処理する必要があり、3 番目は応答を送信する必要があります。これら 3 つのビルディング ブロックをすべて実装するのは、クリック ストリームを実装するよりも少しだけ複雑です。

def mkServer(address: String, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = {
  import system.dispatcher

  val connectionHandler: Sink[Tcp.IncomingConnection, Future[Unit]] =
    Sink.foreach[Tcp.IncomingConnection] { conn =>
      println(s"Incoming connection from: ${conn.remoteAddress}")
      conn.handleWith(serverLogic)
    }

  val incomingCnnections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] =
    Tcp().bind(address, port)

  val binding: Future[Tcp.ServerBinding] =
    incomingCnnections.to(connectionHandler).run()

  binding onComplete {
    case Success(b) =>
      println(s"Server started, listening on: ${b.localAddress}")
    case Failure(e) =>
      println(s"Server could not be bound to $address:$port: ${e.getMessage}")
  }
}

この関数mkServerは、(サーバーのアドレスとポートに加えて) アクター システムとマテリアライザーも暗黙のパラメーターとして受け取ります。サーバーの制御フローは で表されbinding、着信接続のソースを受け取り、着信接続のシンクに転送します。シンクであるの内部では、後述するconnectionHandlerフローによってすべての接続を処理します。を返しますserverLogicbindingFutureこれは、サーバーが起動されたとき、または起動に失敗したときに完了します。これは、ポートが別のプロセスによって既に使用されている場合に発生する可能性があります。ただし、応答を処理するビルディング ブロックが表示されないため、コードはグラフィックを完全には反映していません。これは、接続自体がこのロジックをすでに提供しているためです。これは双方向フローであり、前の例で見たフローのような単方向フローではありません。具現化の場合と同様に、このような複雑なフローについてはここでは説明しません。公式ドキュメントには、より複雑なフロー グラフをカバーするための資料がたくさんあります。今 のところTcp.IncomingConnection、 がリクエストの受信方法と応答の送信方法を知っている接続を表していることを知っていれば十分です。まだ足りない部分は、serverLogic建築用ブロック。次のようになります。

サーバーロジック

繰り返しになりますが、ロジックをいくつかの単純なビルディング ブロックに分割し、それらをまとめてプログラムのフローを形成することができます。最初に、一連のバイトを複数の行に分割します。これは、改行文字を見つけるたびに行う必要があります。その後、生のバイトを扱うのは面倒なので、各行のバイトを文字列に変換する必要があります。全体として、複雑なプロトコルのバイナリ ストリームを受信する可能性があり、受信した生データの処理が非常に困難になります。読み取り可能な文字列を取得したら、回答を作成できます。簡単にするために、私たちの場合、答えは何でもかまいません。最後に、ネットワーク経由で送信できる一連のバイトに応答を戻す必要があります。ロジック全体のコードは次のようになります。

val serverLogic: Flow[ByteString, ByteString, Unit] = {
  val delimiter = Framing.delimiter(
    ByteString("\n"),
    maximumFrameLength = 256,
    allowTruncation = true)

  val receiver = Flow[ByteString].map { bytes =>
    val message = bytes.utf8String
    println(s"Server received: $message")
    message
  }

  val responder = Flow[String].map { message =>
    val answer = s"Server hereby responds to message: $message\n"
    ByteString(answer)
  }

  Flow[ByteString]
    .via(delimiter)
    .via(receiver)
    .via(responder)
}

を受け取り、を生成するserverLogic必要があるフローであることは既にわかっています。を使用すると、 aを小さな部分に分割できます。この場合、改行文字が発生するたびに発生する必要があります。すべての分割バイト シーケンスを取得し、それらを文字列に変換するフローです。もちろん、印刷可能な ASCII 文字のみを文字列に変換する必要があるため、これは危険な変換ですが、私たちのニーズには十分です。は最後のコンポーネントであり、回答を作成し、回答を一連のバイトに変換します。グラフィックとは対照的に、この最後のコンポーネントを 2 つに分割しませんでした。これは、ロジックが単純であるためです。最後に、すべてのフローをByteStringByteStringdelimiterByteStringreceiverrespondervia関数。この時点で、冒頭で述べたマルチユーザー プロパティを処理したかどうかを尋ねる人がいるかもしれません。実際、すぐにはわからないかもしれませんが、私たちはそうしました。この図を見ると、より明確になるはずです。

サーバーとサーバーロジックの組み合わせ

コンポーネントは、serverLogicより小さなフローを含むフローに他なりません。このコンポーネントは、要求である入力を受け取り、応答である出力を生成します。フローは複数回構築でき、それらはすべて互いに独立して機能するため、このネストによってマルチユーザー プロパティを実現します。すべてのリクエストは独自のリクエスト内で処理されるため、実行時間の短いリクエストが、以前に開始された実行時間の長いリクエストをオーバーランする可能性があります。ご参考までに、serverLogic前に示した の定義は、もちろん、その内部定義のほとんどをインライン化することで、はるかに短く書くことができます。

val serverLogic = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(msg => s"Server hereby responds to message: $msg\n")
  .map(ByteString(_))

Web サーバーのテストは次のようになります。

$ # Client
$ echo "Hello World\nHow are you?" | netcat 127.0.0.1 6666
Server hereby responds to message: Hello World
Server hereby responds to message: How are you?

上記のコード例が正しく機能するためには、最初にサーバーを起動する必要があります。これはstartServerスクリプトで示されています。

$ # Server
$ ./startServer 127.0.0.1 6666
[DEBUG] Server started, listening on: /127.0.0.1:6666
[DEBUG] Incoming connection from: /127.0.0.1:37972
[DEBUG] Server received: Hello World
[DEBUG] Server received: How are you?

この単純な TCP サーバーの完全なコード例は、ここにあります。Akka Streams を使用してサーバーを作成できるだけでなく、クライアントも作成できます。次のようになります。

val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(println)
  .map(_ ⇒ StdIn.readLine("> "))
  .map(_+"\n")
  .map(ByteString(_))

connection.join(flow).run()

完全なコードの TCP クライアントは、ここにあります。コードは非常によく似ていますが、サーバーとは対照的に、着信接続を管理する必要はもうありません。

複雑なグラフ

前のセクションでは、フローから単純なプログラムを構築する方法を見てきました。ただし、実際には、組み込み関数に依存してより複雑なストリームを構築するだけでは不十分な場合がよくあります。任意のプログラムに Akka Streams を使用できるようにしたい場合は、アプリケーションの複雑さに対処できる独自のカスタム制御構造と組み合わせ可能なフローを構築する方法を知る必要があります。良いニュースは、Akka Streams がユーザーのニーズに合わせてスケーリングするように設計されていることです。Akka Streams のより複雑な部分を簡単に紹介するために、クライアント/サーバーの例にいくつかの機能を追加します。

まだできないことの 1 つは、接続を閉じることです。これまで見てきたストリーム API では任意の時点でストリームを停止することができないため、この時点で少し複雑になり始めます。ただし、GraphStage任意の数の入力ポートまたは出力ポートを持つ任意のグラフ処理ステージを作成するために使用できる抽象化があります。最初にサーバー側を見てみましょう。ここでは、 という新しいコンポーネントを導入しますcloseConnection

val closeConnection = new GraphStage[FlowShape[String, String]] {
  val in = Inlet[String]("closeConnection.in")
  val out = Outlet[String]("closeConnection.out")

  override val shape = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
      override def onPush() = grab(in) match {
        case "q" ⇒
          push(out, "BYE")
          completeStage()
        case msg ⇒
          push(out, s"Server hereby responds to message: $msg\n")
      }
    })
    setHandler(out, new OutHandler {
      override def onPull() = pull(in)
    })
  }
}

この API は、フロー API よりもはるかに扱いにくいように見えます。当然のことながら、ここで多くの必須の手順を実行する必要があります。その代わりに、ストリームの動作をより細かく制御できます。上記の例では、1 つの入力ポートと 1 つの出力ポートのみを指定し、shape値をオーバーライドしてシステムで使用できるようにします。さらに、いわゆる と を定義しましInHandlerOutHandler。これらは、この順序で要素の受信と放出を担当します。完全なクリック ストリームの例をよく見ると、これらのコンポーネントをすでに認識しているはずです。InHandler要素を取得し、それが 1 文字の文字列である場合は、'q'ストリームを閉じます。クライアントにストリームがすぐに閉じられることを知る機会を与えるために、文字列を発行します"BYE"その後、すぐにステージを閉じます。このコンポーネントは、フローに関するセクションで紹介しcloseConnectionたメソッドを介してストリームと組み合わせることができます。via

接続を閉じることができることに加えて、新しく作成された接続にウェルカム メッセージを表示できるとよいでしょう。これを行うには、もう少し先に進む必要があります。

def serverLogic
    (conn: Tcp.IncomingConnection)
    (implicit system: ActorSystem)
    : Flow[ByteString, ByteString, NotUsed]
    = Flow.fromGraph(GraphDSL.create() { implicit b ⇒
  import GraphDSL.Implicits._
  val welcome = Source.single(ByteString(s"Welcome port ${conn.remoteAddress}!\n"))
  val logic = b.add(internalLogic)
  val concat = b.add(Concat[ByteString]())
  welcome ~> concat.in(0)
  logic.outlet ~> concat.in(1)

  FlowShape(logic.in, concat.out)
})

この関数serverLogic は、着信接続をパラメーターとして受け取るようになりました。その本体の内部では、複雑なストリームの動作を記述できるようにする DSL を使用します。ではwelcome、1 つの要素 (ウェルカム メッセージ) のみを発行できるストリームを作成します。は、前のセクションでlogic説明したとおりです。serverLogic唯一の注目すべき違いは、追加closeConnectionしたことです。ここで実際に DSL の興味深い部分が登場します。この関数は、ストリームをグラフとして表現するために使用されるGraphDSL.createビルダーを使用可能にします。bこの~>機能により、入力ポートと出力ポートを相互に接続することができます。このConcat例で使用されているコンポーネントは要素を連結でき、ここではウェルカム メッセージを他の要素の前に追加するために使用されています。internalLogic. 最後の行では、サーバー ロジックの入力ポートと連結ストリームの出力ポートのみを使用できるようにします。これは、他のすべてのポートがserverLogicコンポーネントの実装の詳細のままであるためです。Akka Streams のグラフ DSL の詳細な紹介については、公式ドキュメントの対応するセクションを参照してください。複雑な TCP サーバーとそれと通信できるクライアントの完全なコード例は、ここにあります。クライアントから新しい接続を開くと、ウェルカム メッセージが表示されます"q"。クライアントで入力すると、接続がキャンセルされたことを示すメッセージが表示されます。

この回答でカバーされていないトピックがまだいくつかあります。特にマテリアライゼーションは、ある読者や別の読者を怖がらせるかもしれませんが、ここでカバーされている資料があれば、誰もが自分で次のステップに進むことができるはずです. 既に述べたように、公式ドキュメントは Akka Streams について学び続けるのに適した場所です。

于 2016-01-31T22:08:42.200 に答える