問題タブ [akka-stream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
572 参照

scala - Iteratees エラー/例外処理 vs リアクティブ ストリーム/akka-stream

驚いたことに、Iteratees とエラー処理にいくつか問題があります。

問題;

(ネットワークから、InputStream である必要があります)からいくつかのバイトを読み取りInputStream、この InputStream でいくつかのチャンク/グルーミングを実行し (作業分散のために)、これcase class DataBlock(blockNum: Int, data: ByteString)をアクターに送信するための に変換する変換が続きます (Array[Bytes] は に変換されますCompactByteString)。

流れ;

InputStream.read -- bytes --> Group -- 1000 byte blocks --> Transform -- DataBlock --> Actors

コード;

質問;

私の現在の Iteratee コードはうまく機能します。ただし、どちらの側でも障害を処理できるようにしたいと考えています。

  1. InputStreamreadメソッドが失敗した場合 - 正常に処理されたバイト数/ブロック数を知り、その時点からストリームの読み取りを再開したい。列挙子を読み込むとエラーがスローされ、fut例外が返されるだけで、状態がないため、rxing アクターに渡さない限り、現在どのブロックにいるのかわかりません (これはやりたくないことです)。
  2. 出力側が失敗した場合、またはアクタのバッファが入力ストリームからの読み取りを完全に保持しているため、DataBlock メッセージを受信できなくなった場合

どうすればいいですか?

定義済みのエラー処理が必要なため、Play の iteratees よりも react-streams/Akka-stream (実験的) または scalaz iteratees を使用してこれを試した方がよいでしょうか?

0 投票する
1 に答える
526 参照

scala - akka ストリームでの mapFuture の使用

私はAkka Streamsで遊んでいて、MongoDBコレクションからポーリングされたイベントの強化と処理を試みています。ただし、外部データ ソースに接続する必要がある可能性があるイベント エンリッチャーを実装する最善の方法については、疑問があります。

mapFuture は適しているようですが、いくつか問題があります。

そして私のアプリ:

しかし、私はこのエラーで立ち往生しています:

mapFuture への呼び出しで。

私は何が欠けていますか?

これらのエンリッチメントに対処するためのより良いアイデアはありますか?

スタック トレースを更新します。

ありがとう

0 投票する
2 に答える
4907 参照

scala - Akka Streams: いくつかのフローが完了するまで待つ方法

Flow私のプログラムには、並行して処理したいいくつかの があります。すべてが完了したら、何らかのアクションをトリガーしたいと思います。

これを行う 1 つの方法は、各完了後にアクターにメッセージを送信することです。アクターがすべてのフローの準備ができていることを確認すると、アクションをトリガーできます。

akka-streams Scala DSL 内に、見落としている可能性のあるもので、さらに簡単にするものがあるかどうか疑問に思っていました。

EDIT : ドキュメントに記載されているように、Future はストリームで発生した最初のイベントの直後に完了するため、Flow を Future に変換することはできません。次のコードを実行します。

「tick」、「future completed」、そして「tick」の無限のシーケンスが続きます。

0 投票する
1 に答える
1843 参照

akka - Akka-Http を使用したスト​​リーミング ビデオまたは (長さ不明のストリーム)

私は実験プロジェクトの akka-http(akka-http-experimental_2.11 - 0.4) に取り組んでいます。私は以前にスプレーに取り組んだことがありません。

mp4 ビデオ (サイズは異なる場合があります) をブラウザーにストリーミングしたいと思います。しかし、HttpResponse (HttpEntity.Chunked ?) の HttpEntity を作成する方法がわかりません。私はこのような汚いことを試しましたが、これは正しい方法ではありませんが、これは Firefox で単一の要求に対してのみ機能します。

別のタブまたはブラウザーで同じ URL を開くと、サーバーはその要求を処理できません。これは実験的なプロジェクトであるため、大きなファイルのストリーミングに関する十分なドキュメントがありません。

サンプルソースコードフォームhttps://github.com/akka/akka/blob/release-2.3-dev/akka-http-core/src/test/scala/akka/http/TestServer.scalaを入手しました

HttpEntity.Chunked のプロデューサーを作成する方法を知る必要があります。簡単な言葉で説明できる人がいれば、API を理解するのに役立ちます。

ありがとうございました。

(PS: スタック オーバーフローで Akka-Http タグを作成してください)

0 投票する
0 に答える
1455 参照

scala - Akka Stream での例外/エラー処理

次のダクトを定義しました。

およびフロー:

Augmenter は次のようになります。

ここで、Augmenter1 の例外につながる条件が満たされた場合、フローは例外をスローすることなく、例外の最初のインスタンスで (正常に) 終了します。2 つのことを実行できるようにしたいと考えています。例外をチェーンでキャッチし、次のイベントにスキップします。

私の質問: フロー内のエラー/例外を処理する適切な方法は何ですか?

ありがとう

0 投票する
4 に答える
4405 参照

scala - Akka Streams を使用して区切り文字で受信ストリームを分割する方法

私は実験的な Akka Streams API を少しいじっていましたが、実装方法を確認したいユースケースがあります。私のユースケースでは、接続の入力ストリームをサーバーソケットにバインドすることから供給されるStreamTcpベースがあります。Flow私が持っているフローは、ByteString入ってくるデータに基づいています。入ってくるデータには区切り文字が含まれます。つまり、区切り文字の前のすべてを 1 つのメッセージとして扱い、次の区切り文字までのすべてを次のメッセージとして扱う必要があります。ソケットを使用せず、静的テキストのみを使用して、より単純な例を試してみると、次のようになりました。

Flow目標を達成するために私が見つけた の主な機能は でした。これは、区切り文字splitWhenごとにメッセージごとに 1 つずつ、追加のサブフローを生成します。.次に、各サブフローを別のステップ パイプラインで処理し、最後に個々のメッセージを出力します。

これは、かなり単純で一般的な使用例であると私が考えていたことを実現するには、少し冗長に思えます。だから私の質問は、これを行うためのよりクリーンで冗長でない方法はありますか、それともストリームを区切り文字で分割するための正しい方法であり、好ましい方法ですか?

0 投票する
1 に答える
1494 参照

scala - Akka Reactive ストリームのテスト

次の方法で取得した発信ストリーム TCP 接続を介してメッセージをストリーミングするコードをテストしています。

私のテストでは、結果Subscriber[ByteString]をダミーのサブスクライバーに置き換え、いくつかの送信メッセージをトリガーし、期待どおりに到着したことをアサートします。以下のメソッドを使用して、ダミーのサブスクライバーとストリーム結果の未来を生成します。(ここまでは順調ですね)

私の質問は次のとおりです。ストリームが期待値を出力することをテストするための標準的な方法はありますTestActorRefか? そうでない場合、上記の関数に似たライブラリ関数はありますか?

0 投票する
1 に答える
412 参照

akka - サブスクライバにルーティング機能を提供する方法

ストリームの次のパスがあります-

これを akka ストリームの方法で記述する方法がわかりません。フォローしてみました(疑似)。

k1、k2、k3 は kafka ストリーム パブリッシャーです。

これがパブリッシャをシンクに接続する方法です。しかし、ここで解決したい問題は遅い IO です。IOHandler アクターはメッセージの処理が非常に遅いため、複数の IOHandler を使用してタスクを分散するにはどうすればよいでしょうか。また、バックプレッシャーを維持したいので、火を使わず、ルーターを使用することを忘れています。

私はakkaストリームに非常に慣れていないので、方法を提案してください。

ありがとう

0 投票する
2 に答える
1580 参照

scala - 例外が Akka Stream フローを停止しないのはなぜですか

API 応答に依存するフローがあります。応答が期待どおりでない場合、例外がスローされます。この戦略は、Spray と spec2 を使用した直接メソッド テストでうまく機能します。

ただし、例外をスローするモジュールでストリームを使用しようとすると、フローは単に停止します。

これが私の流れです:

これに対する私の戦略はmap、先物に使用することです。

そのようです:

これは、まったく別の場所にある潜在的な例外スローワーです。

何かが欠けているようですが、何が見えません。ご指摘ありがとうございます。

0 投票する
1 に答える
64 参照

scala - SinkSource[T] のようなものはありますか?

SinkSourceとを提供する をSink探していますSource。要素がそれに流れ込む場合Sink、対応する で提供する必要がありますSource。次のコードは、私が何を意味するかを示しています。

実行すると、次のように表示されます: 12345
So, does a SinkSourceexist (haven't seen it in the API) or does someone know how to implement it? Sinkandへの別個のアクセスが必要であることを言及する必要があるSourceためFlow、これはこの特定の形式では解決策ではありません。