問題タブ [akka-stream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
2 に答える
356 参照

java - Java-Akka: 複数のアクターからのメッセージを組み立てる

私はAkkaが初めてです。4人の俳優がいます。

Actor1 は Message1 を Actor4 に送信します。Actor2 は Message2 を Actor4 に送信します。Actor3 は Message3 を Actor4 に送信します。

Actor4 は、Message1 タイプのメッセージを 1 つ、Message2 タイプのメッセージを 1 つ、Message3 タイプのメッセージを 2 つ受信すると、Message4 を作成する必要があります。

メッセージ 4 = (メッセージ 1、メッセージ 2、メッセージ 3a、メッセージ 3b、メッセージ 3c)

これを行うための最良のアプローチは何ですか?

0 投票する
1 に答える
127 参照

scala - アクターから Play Result へのデータのストリーミング

チャンクでアクターに到着するデータがあり、それらのチャンクをPlay のストリームとして返したいと考えていますResult。から応答を取得する唯一の方法Ok.streamは理想的な候補のように見えるため、次のようになります。

アクターからを返し、Enumerator[Array[Byte]]メッセージがアクターに到着すると、アクター内で列挙子にチャンクをプッシュし続けます。ただし、アクターから変更可能な Enumerator を返すことは、何らかの違反のように思えます。

これを達成するためのより適切な方法はありますか?akka-stream問題空間に対処する可能性のある抽象化であるか、抽象化であると考えましakka.ioたが、それらがどのように適用されるかわかりません。

0 投票する
2 に答える
4945 参照

postgresql - slick の 3.0.0 ストリーミング結果と Postgresql を操作する正しい方法は何ですか?

滑らかなストリーミングを操作する方法を理解しようとしています。postgresドライバーでslick 3.0.0を使用しています

状況は次のとおりです。サーバーは、サイズ(バイト単位)で制限されたチャンクに分割されたデータのシーケンスをクライアントに提供する必要があります。だから、私は次の洗練されたクエリを書きました:

私は seq を akka-streams と組み合わせSource、 custom を書きPushPullStageました。これは、データのサイズ (バイト単位) を制限し、サイズ制限に達したときにアップストリームを終了します。それはうまく動作します。問題は、postgres のログを調べると、そのようなクエリが表示されることです。 select * from sequences where user_id = 0 and timestamp > 0 order by timestamp;

そのため、一見すると、各クエリで数バイトを使用するだけで、多くの (そして不必要な) データベース クエリが実行されているように見えます。データベースのクエリを最小限に抑え、各クエリで転送されるデータを最大限に活用するために、Slick でストリーミングを行う正しい方法は何ですか?

0 投票する
2 に答える
259 参照

scala - FlowGraph を作成するとき、2 つの Flow を結合するにはどうすればよいですか?

私は、1 つのソース、1 つのシンク、およびそれらの間に 2 つの「フロー」を持つ単純なフローを構築しようとしています。だから何か

コメント行はコンパイルされませんが、私が達成しようとしていることを示しています。

この例は、3 を加算する新しい関数を定義したり、関数を構成したりするのと同じくらい簡単になるように工夫されていますが、実際には関数ははるかに複雑であり、簡単にするために分離されています。

ここでは、ファンアウトやファンインを行うつもりはありません。それらの間に任意の数の関数を含めることができる単純な流れです。

ありがとう。

0 投票する
2 に答える
10167 参照

scala - Akka-Streams HTTP で HttpResponse 本体全体を文字列として取得する

akka.http新しいライブラリの使い方を理解しようとしています。http 要求をサーバーに送信し、応答本文全体を単一の文字列として読み取り、Source[String,?].

これまでに作成できた最良のソリューションは次のとおりです。

(エラー パスの欠落を除いて) うまく機能しているように見えますが、このような単純なタスクには少し扱いに​​くいです。よりスマートなソリューションはありますか? grouped/を避けることはできますmkStringか?

0 投票する
1 に答える
726 参照

scala-2.11 - akka ストリームを使用してメッセージのみを消費する TCP レシーバーを作成するにはどうすればよいですか?

私たちは上にいます: akka-stream-experimental_2.11 1.0.

に触発された

次のように TCP レシーバーを作成しました。

ただし、私たちの意図は、受信者がまったく応答せず、メッセージをシンクするだけにすることでした。(TCP メッセージ パブリッシャーは response を気にしません)。

それは可能ですか?akka.stream.scaladsl.Tcp.IncomingConnection はタイプ Flow[ByteString, ByteString, Unit] のフローを取るため、まったく応答しません。

はいの場合、いくつかのガイダンスをいただければ幸いです。前もって感謝します。

次の 1 つの試みは私の単体テストに合格しますが、それが最善のアイデアかどうかはわかりません。

0 投票する
2 に答える
697 参照

scala-2.11 - akka.stream.io.Framing$FramingException からの回復方法

オン: akka-stream-experimental_2.11 1.0。

TCP サーバーで Framing.delimiter を使用しています。maximumFrameLength を超える長さのメッセージが到着すると、FramingException がスローされ、ActorSubscriber の OnError からキャプチャできます。

サーバーコード:

加入者コード:

問題: 最大フレーム長未満のメッセージは、その着信接続のこの例外の後、流れません。クライアントを強制終了して再実行すると、正常に動作します。

ActorSubscriber は監督を尊重しません

悪いメッセージを飛ばして次の良いメッセージに進む正しい方法は何ですか?

0 投票する
1 に答える
566 参照

scala - Akka: 具体化された値を使用してシンクの入力を変換する

シンク: があり、各要素をマッピングして、元の実体化された型と値を保持することによりSink[String, Mat]、それをシンク: に変換したいと考えています。Sink[Int, Mat]num: Int => ("num" + num): String

Matだった場合Unit、それは簡単です:

しかし、任意のシンクを変換するのはMatどうですか?