問題タブ [reactive-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
572 参照

scala - Iteratees エラー/例外処理 vs リアクティブ ストリーム/akka-stream

驚いたことに、Iteratees とエラー処理にいくつか問題があります。

問題;

(ネットワークから、InputStream である必要があります)からいくつかのバイトを読み取りInputStream、この InputStream でいくつかのチャンク/グルーミングを実行し (作業分散のために)、これcase class DataBlock(blockNum: Int, data: ByteString)をアクターに送信するための に変換する変換が続きます (Array[Bytes] は に変換されますCompactByteString)。

流れ;

InputStream.read -- bytes --> Group -- 1000 byte blocks --> Transform -- DataBlock --> Actors

コード;

質問;

私の現在の Iteratee コードはうまく機能します。ただし、どちらの側でも障害を処理できるようにしたいと考えています。

  1. InputStreamreadメソッドが失敗した場合 - 正常に処理されたバイト数/ブロック数を知り、その時点からストリームの読み取りを再開したい。列挙子を読み込むとエラーがスローされ、fut例外が返されるだけで、状態がないため、rxing アクターに渡さない限り、現在どのブロックにいるのかわかりません (これはやりたくないことです)。
  2. 出力側が失敗した場合、またはアクタのバッファが入力ストリームからの読み取りを完全に保持しているため、DataBlock メッセージを受信できなくなった場合

どうすればいいですか?

定義済みのエラー処理が必要なため、Play の iteratees よりも react-streams/Akka-stream (実験的) または scalaz iteratees を使用してこれを試した方がよいでしょうか?

0 投票する
0 に答える
1455 参照

scala - Akka Stream での例外/エラー処理

次のダクトを定義しました。

およびフロー:

Augmenter は次のようになります。

ここで、Augmenter1 の例外につながる条件が満たされた場合、フローは例外をスローすることなく、例外の最初のインスタンスで (正常に) 終了します。2 つのことを実行できるようにしたいと考えています。例外をチェーンでキャッチし、次のイベントにスキップします。

私の質問: フロー内のエラー/例外を処理する適切な方法は何ですか?

ありがとう

0 投票する
1 に答える
1494 参照

scala - Akka Reactive ストリームのテスト

次の方法で取得した発信ストリーム TCP 接続を介してメッセージをストリーミングするコードをテストしています。

私のテストでは、結果Subscriber[ByteString]をダミーのサブスクライバーに置き換え、いくつかの送信メッセージをトリガーし、期待どおりに到着したことをアサートします。以下のメソッドを使用して、ダミーのサブスクライバーとストリーム結果の未来を生成します。(ここまでは順調ですね)

私の質問は次のとおりです。ストリームが期待値を出力することをテストするための標準的な方法はありますTestActorRefか? そうでない場合、上記の関数に似たライブラリ関数はありますか?

0 投票する
1 に答える
760 参照

akka - フロー グラフの作成

私は Akka Streams をいじって、 を作成しFlow、それらを s を使用して接続するというアイデアを得ましたFlowGraph

Akka のこの部分はまだ開発中であることはわかっています。そのため、いくつかのことが完了していない可能性があり、他のビットが変更される可能性がありますが、「完全」ではない (つまり、a に接続されていない) FlowGraph を作成して渡すことは可能ですかSink?フローを追加してコードのさまざまな部分を拡張し、最後にシンクを追加して完成させますか?

基本的に、FlowGraph を作成できるようにしたいのですが、方法がわかりません...特に、FlowGraph が Broadcast を使用してストリームを分割した場合。

ありがとう

0 投票する
2 に答える
2037 参照

scala - NIO バイナリ処理に Reactive Streams を使用するには?

org.reactivestreamsライブラリを使用して Java NIO を使用して大きなデータ ストリームを処理するコード例はありますか(高パフォーマンスのため)? 私は分散処理を目指しているので、Akka を使用した例が最適ですが、それは理解できます。

scala でファイルを読み取るほとんどの (すべてではないことを願っています) 例は、Source(非バイナリ) または直接の Java NIO (さらにはFiles.readAllBytes!)に頼っています。

おそらく、見逃したアクティベーター テンプレートがありますか? ( Akka Streams with Scala!は、バイナリ/NIO 側を除いて、私が必要とするすべてに密接に対応しています)

0 投票する
1 に答える
6189 参照

reactive-programming - 2 つのストリームを (null なしで) マージし、ペアに条件を適用する方法は?

2 つのデータ ストリームがあるとします。それらをマージして、これら 2 つのストリーム間のデータに条件を適用する方法はありますか? 例えば

rxjsを使用して上記の合成ストリームを取得するには? いくつかの通知を発生させるために、構成されたストリームに条件を適用したいと思います。また、最後の既知の null 以外のデータを使用することもできます。たとえば、以下の構成されたストリームを参照してください。

リアクティブ ストリームのアイデアを使い始めたばかりなので、リアクティブ ストリームのアイデアを誤解している場合は修正してください。

0 投票する
1 に答える
899 参照

scala - マテリアライズド Akka Stream フロー グラフを公開またはサブスクライブするにはどうすればよいですか?

私は Akka Stream をいじっており、実体化後の柔軟性を理解しようとしています。

これを行う 1 つの方法は、低レベルのリアクティブ ストリーム API を使用することです: http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M3/#akka.stream.scaladsl.PublisherSource

ただし、パブリッシュまたはサブスクライブするこれらのポイントを定義する必要があります。任意のマテリアライズド フロー グラフ ノードを公開またはサブスクライブする方法はありますか? マテリアライズド フロー グラフはアクターの集まりにすぎないため、これは可能です。

例: 最初に、フロー グラフ 1 をデプロイします: A ~> B ~> C

次に、フロー グラフ 2 と 3 をデプロイします: D ~> BB ~> E

0 投票する
1 に答える
1657 参照

scala - Akka-Streams: Kafka と Cassandra での At-Least-Once-Delivery 動作

私は Akka Streams から始めていますが、これまでのところすべてうまくいっています。ただし、アプローチ方法がわからないユースケースに遭遇しました。このシナリオは、Kafka からのメッセージを消費するソースとしての ActorPublisher と、Cassandra テーブルを更新するシンクとしてのサブスクライバーを持つストリームです。

Kafka ~> いくつかのマッピング操作 ~> Cassandra

要点は、メッセージが正常に処理され、Cassandra に挿入されるたびに、Kafka に明示的に確認したいということです。これにより、災害が発生してサービスが失敗した場合にメッセージを読み直すことができます。つまり、少なくとも何らかの一度配信動作。Akka Streams の観点からこれにどのようにアプローチできますか?. サポートされているシナリオですか?.

自動コミット動作を使用して Kafka コンシューマーをいつでも構成できるのは事実ですが、メッセージの読み取り方法を制御したいのです。

アップデート

このトピックに関して、私たちは現在リアクティブ Kafkaを評価しており、バージョン 0.8 の時点で kafka に手動コミットが含まれています (これらの人に敬意を表します)。この機能により、必要な alo 動作を実装できます。

0 投票する
1 に答える
1022 参照

servlets - サーブレットで Java 9 フローを使用したリアクティブ ストリームのユースケースは?

サーブレット コンテナー (または単に HTTP サーバー) 内でリアクティブ ストリームを使用するユースケースを探しています。

Jetty プロジェクトは、 「Jetty はリアクティブですか?」という質問を受け始めています。Java 9 にリアクティブ ストリームを追加するという提案に気付きました。

そのため、非同期サーブレット IO にリアクティブ ストリーム API を使用するいくつかの実験を開始しました。これは十分に興味深いものですが、最も重要な関心事に焦点を当てる実際の使用例がないため、焦点が当てられていません。

誰かが彼らのニーズを満たすために私たちの突堤の実験を指示できるように、共有/説明できる良いユースケースを持っていますか. 私が想像したのは、RS ベースのデータベース パブリッシャーが、途中で変換のために Flow.Processors を使用して、HTTP 応答または Websocket 接続でオブジェクトを送信することです。

0 投票する
1 に答える
71 参照

scala - Can I implement my own OverflowStrategy?

Is it possible (or will it be possible in the future) to implement my own OverflowStrategy as a function of the current buffer of the element? Or there's a particular reason to not allow that?

Thanks!