問題タブ [reactive-streams]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - Iteratees エラー/例外処理 vs リアクティブ ストリーム/akka-stream
驚いたことに、Iteratees とエラー処理にいくつか問題があります。
問題;
(ネットワークから、InputStream である必要があります)からいくつかのバイトを読み取りInputStream
、この InputStream でいくつかのチャンク/グルーミングを実行し (作業分散のために)、これcase class DataBlock(blockNum: Int, data: ByteString)
をアクターに送信するための に変換する変換が続きます (Array[Bytes] は に変換されますCompactByteString)。
流れ;
InputStream.read -- bytes --> Group -- 1000 byte blocks --> Transform -- DataBlock --> Actors
コード;
質問;
私の現在の Iteratee コードはうまく機能します。ただし、どちらの側でも障害を処理できるようにしたいと考えています。
- InputStream
read
メソッドが失敗した場合 - 正常に処理されたバイト数/ブロック数を知り、その時点からストリームの読み取りを再開したい。列挙子を読み込むとエラーがスローされ、fut
例外が返されるだけで、状態がないため、rxing アクターに渡さない限り、現在どのブロックにいるのかわかりません (これはやりたくないことです)。 - 出力側が失敗した場合、またはアクタのバッファが入力ストリームからの読み取りを完全に保持しているため、DataBlock メッセージを受信できなくなった場合
どうすればいいですか?
定義済みのエラー処理が必要なため、Play の iteratees よりも react-streams/Akka-stream (実験的) または scalaz iteratees を使用してこれを試した方がよいでしょうか?
scala - Akka Stream での例外/エラー処理
次のダクトを定義しました。
およびフロー:
Augmenter は次のようになります。
ここで、Augmenter1 の例外につながる条件が満たされた場合、フローは例外をスローすることなく、例外の最初のインスタンスで (正常に) 終了します。2 つのことを実行できるようにしたいと考えています。例外をチェーンでキャッチし、次のイベントにスキップします。
私の質問: フロー内のエラー/例外を処理する適切な方法は何ですか?
ありがとう
scala - Akka Reactive ストリームのテスト
次の方法で取得した発信ストリーム TCP 接続を介してメッセージをストリーミングするコードをテストしています。
私のテストでは、結果Subscriber[ByteString]
をダミーのサブスクライバーに置き換え、いくつかの送信メッセージをトリガーし、期待どおりに到着したことをアサートします。以下のメソッドを使用して、ダミーのサブスクライバーとストリーム結果の未来を生成します。(ここまでは順調ですね)
私の質問は次のとおりです。ストリームが期待値を出力することをテストするための標準的な方法はありますTestActorRef
か? そうでない場合、上記の関数に似たライブラリ関数はありますか?
akka - フロー グラフの作成
私は Akka Streams をいじって、 を作成しFlow
、それらを s を使用して接続するというアイデアを得ましたFlowGraph
。
Akka のこの部分はまだ開発中であることはわかっています。そのため、いくつかのことが完了していない可能性があり、他のビットが変更される可能性がありますが、「完全」ではない (つまり、a に接続されていない) FlowGraph を作成して渡すことは可能ですかSink
?フローを追加してコードのさまざまな部分を拡張し、最後にシンクを追加して完成させますか?
基本的に、FlowGraph を作成できるようにしたいのですが、方法がわかりません...特に、FlowGraph が Broadcast を使用してストリームを分割した場合。
ありがとう
scala - NIO バイナリ処理に Reactive Streams を使用するには?
org.reactivestreamsライブラリを使用して Java NIO を使用して大きなデータ ストリームを処理するコード例はありますか(高パフォーマンスのため)? 私は分散処理を目指しているので、Akka を使用した例が最適ですが、それは理解できます。
scala でファイルを読み取るほとんどの (すべてではないことを願っています) 例は、Source
(非バイナリ) または直接の Java NIO (さらにはFiles.readAllBytes
!)に頼っています。
おそらく、見逃したアクティベーター テンプレートがありますか? ( Akka Streams with Scala!は、バイナリ/NIO 側を除いて、私が必要とするすべてに密接に対応しています)
reactive-programming - 2 つのストリームを (null なしで) マージし、ペアに条件を適用する方法は?
2 つのデータ ストリームがあるとします。それらをマージして、これら 2 つのストリーム間のデータに条件を適用する方法はありますか? 例えば
rxjsを使用して上記の合成ストリームを取得するには? いくつかの通知を発生させるために、構成されたストリームに条件を適用したいと思います。また、最後の既知の null 以外のデータを使用することもできます。たとえば、以下の構成されたストリームを参照してください。
リアクティブ ストリームのアイデアを使い始めたばかりなので、リアクティブ ストリームのアイデアを誤解している場合は修正してください。
scala - マテリアライズド Akka Stream フロー グラフを公開またはサブスクライブするにはどうすればよいですか?
私は Akka Stream をいじっており、実体化後の柔軟性を理解しようとしています。
これを行う 1 つの方法は、低レベルのリアクティブ ストリーム API を使用することです: http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M3/#akka.stream.scaladsl.PublisherSource
ただし、パブリッシュまたはサブスクライブするこれらのポイントを定義する必要があります。任意のマテリアライズド フロー グラフ ノードを公開またはサブスクライブする方法はありますか? マテリアライズド フロー グラフはアクターの集まりにすぎないため、これは可能です。
例: 最初に、フロー グラフ 1 をデプロイします: A ~> B ~> C
次に、フロー グラフ 2 と 3 をデプロイします: D ~> BB ~> E