問題タブ [reactive-streams]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - Scala Akka Stream: Seq を通過する方法
いくつかのブロッキング呼び出しを でラップしようとしていFuture
ます。戻り値の型はSeq[User]
です。以下は、さまざまなオーバーロードされたバージョンが存在するという苦情でコンパイルされません。助言がありますか?私はほとんどすべてのバリエーションを試しましたが、運がありません。User
case class
Source.apply
json - Akka HTTP: Json 形式の応答をドメイン オブジェクトにアンマーシャリングする方法
私は Akka HTTP を試しており、HttpResponse でドメイン オブジェクトの Json 配列を返すサービスを作成しました。クライアントでは、それをドメイン オブジェクトのソースに変換して、後続のフローとシンクで使用できるようにしたいと考えています。
Json サポートセクションを参照: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/http/common/json-support.html
暗黙的な RootJsonReader などを定義する必要はありましたが、FromEntityUnmarshaller を使用する方法がわかりません。
私のコードはここにあります: https://github.com/charlesxucheng/akka-http-microservice
akka-http-microservice activator テンプレートに基づいています。Service2.scala は私のサーバー実装であり、機能しています。AkkaHttpClient.scala はクライアントの実装であり、不完全です。
build.sbt が最新ではないため、ビルドには Gradle を使用してください。
ありがとう。
scala - Akka/Scala: この Akka Streams フローで何が起こっているのか説明できますか?
私は Akka ストリームに慣れていて、次のようにフィボナッチ パブリッシャー/サブスクライバーの例を作成しました。しかし、需要が最初にどのように生成され、それが加入者の要求戦略とどのような関係にあるのかはまだよくわかっていません。誰か説明してくれませんか?
フィボナッチ出版社:
フィボナッチ購読者:
フィボナッチ アプリ:
サンプル実行:質問: 4 の最初の需要はどこから来たのですか?
java - 春の統合とリアクティブストリーム
私はETLプロジェクトに取り組んでいます。私は長い間春の統合を使用してきました。現在、データ ソースはファイルまたはクロニクルですが、ライブ ストリームに変更される可能性があり、量が増える可能性があります。将来的にはビッグデータ ソリューション (hadoop、spark など) に移行する可能性があります。
これに基づいて、スプリング統合とリアクティブ ストリームを比較する必要がありますか? なぜ誰かが一方を他方よりも優先して使用するのでしょうか (または、そもそも 2 つを比較しようとしている私が間違っているのでしょうか)。一緒に使用できると思われるシナリオ (ある場合) はありますか?
scala - Akka-Stream の実装は、シングル スレッドの実装よりも遅い
2015 年 10 月 30 日からの更新
Roland Kuhn Awnser に基づく:
Akka Streams は、アクタ間の非同期メッセージ パッシングを使用して、ストリーム処理ステージを実装しています。非同期境界を越えてデータを渡すと、ここに表示されているオーバーヘッドがあります。ストリーミング ソリューションでは要素あたり約 1 マイクロ秒かかるのに対し、計算には約 160 ナノ秒しかかからないように見えますが (シングル スレッドの測定から得られます)、これはメッセージ パッシングによって支配されます。
もう 1 つの誤解は、「ストリーム」という言葉は並列処理を意味するというものです。コードでは、すべての計算が 1 つのアクタ (マップ ステージ) で順次実行されるため、基本的なシングル スレッド ソリューションに勝るメリットは期待できません。
Akka Streams によって提供される並列処理を利用するには、それぞれが次のタスクを実行する複数の処理ステージが必要です。
要素あたり 1µs。ドキュメントも参照してください。
私はいくつかの変更を行いました。私のコードは次のようになります。
何か完全に間違っているかどうかはわかりませんが、それでも akka-streams を使用した実装ははるかに遅くなります (以前と同じようにさらに遅くなります)。 -ストリームが高速になります。したがって、正しく理解できれば (それ以外の場合は修正してください)、私の例ではオーバーヘッドが多すぎるようです。コードが重い仕事をしなければならない場合にのみ akka-streams の恩恵を受けるということですか?
私はscalaとakka-streamの両方で比較的新しいです。カウンターが特定の数に達するまでいくつかのイベントを作成する小さなテスト プロジェクトを作成しました。イベントごとに、イベントの 1 つのフィールドの階乗が計算されます。これを2回実装しました。1 回は akka-stream を使用し、もう 1 回は akka-stream を使用せずに (シングル スレッド)、ランタイムを比較しました。
私はそれを期待していませんでした: 単一のイベントを作成すると、両方のプログラムの実行時間はほぼ同じです。しかし、70,000,000 個のイベントを作成すると、akka-stream を使用しない実装の方がはるかに高速です。これが私の結果です(次のデータは24回の測定に基づいています):
- akka-streams なしの単一イベント: 403 (± 2)ms
akka-streams を使用した単一イベント: 444 (+-13)ms
akka-streams のない 70Mio イベント: 11778 (+-70)ms
- akka-steams での 70Mio イベント: 75424(+-2959)ms
だから私の質問は:何が起こっているのですか?akka-stream を使用した実装が遅いのはなぜですか?
ここに私のコード:
Akka による実装
Akka を使用しない実装
オブジェクトシングルスレッド{
共有機能
実装イベント
unit-testing - Project Reactor: ブロードキャスターが終了するまで待ちます
文字列を受け取り、それらを StringBuilder に追加する Broadcaster があります。
テストしたいです。
Thread#sleep
ブロードキャスターが文字列の処理を終えるまで、私は待たなければなりません。削除したいsleep
。
使ってみてControl#debug()
失敗。
scala - Scala Slick: 終わらないストリーム
Slick を使用すると、テーブルから結果のストリームを生成するために次のことができます。
これにより、テーブル内のすべてのイベントが出力されevents
、最後の行の後に終了します。
events
新しい行がテーブルに入力されたときに何らかの方法で通知できると仮定すると、イベントが挿入されたときにイベントを継続的に出力するストリームを作成することは可能ですか? tail -f
DB テーブルの一種。
Slick はこれをネイティブにサポートしないと思いますが、Akka ストリーミングを使用して支援できるはずです。したがって、空になるまで Slick Source から何かを取得し、イベントがテーブル内の追加データを示すのを待ってから、新しいデータをストリーミングすることができた場合。おそらく、を使用しActorPublisher
てこのロジックをバインドしますか?
誰かがこの分野での経験やアドバイスを持っているかどうか疑問に思っていますか?
java - 複数 (10k - 100k) のリクエストに対して Akka HTTP クライアントを適切に呼び出す方法は?
Akka HTTP 2.0-M2 を使用してバッチ データ アップロード用のツールを作成しようとしています。しかし、私は直面していますakka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error.
私は問題を切り分けようとしましたが、これも失敗するサンプル コードです。
次のエラーで失敗します。
これは、多くの Future を作成して一度に実行しようとしているからだと思います。しかし、Akka はバックプレッシャを有効にするべきではありませんか? 使い方が間違っていると思います。私は superPool メソッドを試しましたが、私が理解しHttp.singleRequest
ているように、内部に同じプールがあるため、何も変わりませんでした。また、ループで呼び出す代わりに Http インスタンスを再利用しようとしHttp.get()
ましたが、それも役に立ちませんでした。
リクエストのバッチを起動する正しい方法は何ですか? 10,000 から 100,000 のリクエストのバッチを実行する予定です。
java - リアクティブ ストリームとリアクティブ ストリームの違いは何ですか?
特に RxJava のコンテキストで、 Reactive と ReactiveStreams の違いを理解しようとしていますか?
私が理解できたのは、Reactive Streams の仕様にはバックプレッシャの概念がいくつかあるが、RxJava/Reactive にはrequest(n)
インターフェイスで既に存在するということでした。
ELI5の回答は気にしません。