問題タブ [akka-stream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
0 に答える
1306 参照

scala - Akka-Stream パイプラインをデバッグするにはどうすればよいですか?

ログ行が次のようになるログ ファイルの処理パイプラインを構築しようとしています。

この特定のログ ファイルについては、処理したい約193705 のログ行があります。

最初は次のように見えるフローグラフを作成しました

しかし、その後、シンクでの取得量が大幅に減っていることに気付きました

そのため、すべての線がパイプを通過するかどうかを確認するために、グラフを線形にしました。私の現在のコードは次のようになります

プログラムを再度実行すると、上記の数値に近い行が再び生成されます (まったく同じではありません)。

私は混乱していて、ここで助けを求めていました

  1. ストリームは失敗したり例外をスローしたりしませんが、生成される出力行ははるかに少なくなります。これをデバッグするにはどうすればよいですか?
  2. 1500maximumFrameLengthですが、ログ行はそれよりも大きくなる可能性があります (文字数が増える)。それは問題になる可能性がありますか?そのような場合、どうすれば解決できますか?
  3. どうすれば確認できますか? 私のコードによると、 Materializerを呼び出したときにが返されないためrun()、 をシャットダウンできません ActorSystem。何が欠けていますか?

アップデート

ある行で停止することがわかりました2564 chars(bytes)

であるために停止しline 15071ます2564 bytes

しかし、なぜ例外がスローされないのですか? どうすればこれを処理できますか?

0 投票する
0 に答える
202 参照

java - ActorPublisher のどこで SupervisorStrategy を指定する必要がありますか?

ActorPublisherソースであるストリームを作成しました。SupervisionStrategyストリームのマテリアライザーを設定しました:

ActorPublisherただし、例外がスローされている場合、作成された戦略は使用されません。でオーバーライドも試みましsupervisorStrategy()MyActorPublisher。しかし、それは子役にのみ使用され、機能していないことを理解しました。

0 投票する
1 に答える
2979 参照

scala - akka-http チャンクされた HTTP レスポンスを継続的に送信 (ストリーム)

akka-httpクライアントとサーバーを使用したこの大まかなテスト例があります。

サーバー.scala:

Client.scala:

現時点Serverでは、単一の「テスト」で応答するだけです。

1秒ごとに無限ループで「テスト」をチャンク(ストリーム)として送信するように変更するHttpResponseにはどうすればよいですか?Server

0 投票する
0 に答える
61 参照

java - アクターサブスクライバーが死ぬと、ストリームはどうなりますか?

Akka Streams (java api) では、ActorSubscriber が停止するとアップストリームで何が起こりますか? ストリームはキャンセルされますか?私は akka 2.4 を使用し、akka ストリームは実験的な 1.0 と jdk 8 を使用しています。

コードは次のようになります。

0 投票する
3 に答える
3326 参照

scala - akka-http チャンク レスポンスの連結

akka-httpチャンクされた応答を返す http サービスに要求を行うために使用しています。関連するコードは次のようになります。

コマンドラインで生成される出力は次のようになります。

データの論理部分 - この場合の json は行末記号 で終わります\r\nが、問題は、上記の例で明確にわかるように、json が常に単一の http 応答チャンクに収まるとは限らないことです。

私の質問は、受信したチャンク データを完全な json に連結して、結果のコンテナー タイプが残るようにするにはどうすればよいですSource[Out,M1]Flow[In,Out,M2]? の理念を貫きたいと思いakka-streamます。

更新:応答は無限であり、集計はリアルタイムで行う必要があることにも言及する価値があります

0 投票する
1 に答える
3665 参照

scala - データを収集する Akka-Streams (ソース -> フロー -> フロー (収集) -> シンク)

私は Scala と Akka の初心者です。私は単純な RunnableFlow を持っています:

今、私はこのようなものが欲しいです:

ただし、Flow2 は、Flow1 からの 100 個の要素が利用可能になるまで待機し、その後、これらの 100 個の要素を新しい要素 (Flow1 からの 100 個の要素すべてを必要とする) に変換し、この新しい要素をシンクに渡す必要があります。

いくつかの調査を行い、明示的なユーザー定義のバッファーを見つけましたが、flow2 の flow1 から 100 個の要素すべてにアクセスし、それらを使用して何らかの変換を行う方法がわかりません。誰かがそれを説明できますか?または、小さな簡単な例を投稿することをお勧めしますか? または両方?

0 投票する
2 に答える
2984 参照

scala - Akka-Stream の実装は、シングル スレッドの実装よりも遅い

2015 年 10 月 30 日からの更新


Roland Kuhn Awnser に基づく:

Akka Streams は、アクタ間の非同期メッセージ パッシングを使用して、ストリーム処理ステージを実装しています。非同期境界を越えてデータを渡すと、ここに表示されているオーバーヘッドがあります。ストリーミング ソリューションでは要素あたり約 1 マイクロ秒かかるのに対し、計算には約 160 ナノ秒しかかからないように見えますが (シングル スレッドの測定から得られます)、これはメッセージ パッシングによって支配されます。

もう 1 つの誤解は、「ストリーム」という言葉は並列処理を意味するというものです。コードでは、すべての計算が 1 つのアクタ (マップ ステージ) で順次実行されるため、基本的なシングル スレッド ソリューションに勝るメリットは期待できません。

Akka Streams によって提供される並列処理を利用するには、それぞれが次のタスクを実行する複数の処理ステージが必要です。

要素あたり 1µs。ドキュメントも参照してください。

私はいくつかの変更を行いました。私のコードは次のようになります。

何か完全に間違っているかどうかはわかりませんが、それでも akka-streams を使用した実装ははるかに遅くなります (以前と同じようにさらに遅くなります)。 -ストリームが高速になります。したがって、正しく理解できれば (それ以外の場合は修正してください)、私の例ではオーバーヘッドが多すぎるようです。コードが重い仕事をしなければならない場合にのみ akka-streams の恩恵を受けるということですか?




私はscalaとakka-streamの両方で比較的新しいです。カウンターが特定の数に達するまでいくつかのイベントを作成する小さなテスト プロジェクトを作成しました。イベントごとに、イベントの 1 つのフィールドの階乗が計算されます。これを2回実装しました。1 回は akka-stream を使用し、もう 1 回は akka-stream を使用せずに (シングル スレッド)、ランタイムを比較しました。

私はそれを期待していませんでした: 単一のイベントを作成すると、両方のプログラムの実行時間はほぼ同じです。しかし、70,000,000 個のイベントを作成すると、akka-stream を使用しない実装の方がはるかに高速です。これが私の結果です(次のデータは24回の測定に基づいています):


  • akka-streams なしの単一イベント: 403 (± 2)ms
  • akka-streams を使用した単一イベント: 444 (+-13)ms


  • akka-streams のない 70Mio イベント: 11778 (+-70)ms

  • akka-steams での 70Mio イベント: 75424(+-2959)ms

だから私の質問は:何が起こっているのですか?akka-stream を使用した実装が遅いのはなぜですか?

ここに私のコード:

Akka による実装

Akka を使用しない実装

オブジェクトシングルスレッド{

共有機能

実装イベント