問題タブ [reactive-streams]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - Akka-Streams: Kafka と Cassandra での At-Least-Once-Delivery 動作
私は Akka Streams から始めていますが、これまでのところすべてうまくいっています。ただし、アプローチ方法がわからないユースケースに遭遇しました。このシナリオは、Kafka からのメッセージを消費するソースとしての ActorPublisher と、Cassandra テーブルを更新するシンクとしてのサブスクライバーを持つストリームです。
Kafka ~> いくつかのマッピング操作 ~> Cassandra
要点は、メッセージが正常に処理され、Cassandra に挿入されるたびに、Kafka に明示的に確認したいということです。これにより、災害が発生してサービスが失敗した場合にメッセージを読み直すことができます。つまり、少なくとも何らかの一度配信動作。Akka Streams の観点からこれにどのようにアプローチできますか?. サポートされているシナリオですか?.
自動コミット動作を使用して Kafka コンシューマーをいつでも構成できるのは事実ですが、メッセージの読み取り方法を制御したいのです。
アップデート
このトピックに関して、私たちは現在リアクティブ Kafkaを評価しており、バージョン 0.8 の時点で kafka に手動コミットが含まれています (これらの人に敬意を表します)。この機能により、必要な alo 動作を実装できます。
servlets - サーブレットで Java 9 フローを使用したリアクティブ ストリームのユースケースは?
サーブレット コンテナー (または単に HTTP サーバー) 内でリアクティブ ストリームを使用するユースケースを探しています。
Jetty プロジェクトは、 「Jetty はリアクティブですか?」という質問を受け始めています。Java 9 にリアクティブ ストリームを追加するという提案に気付きました。
そのため、非同期サーブレット IO にリアクティブ ストリーム API を使用するいくつかの実験を開始しました。これは十分に興味深いものですが、最も重要な関心事に焦点を当てる実際の使用例がないため、焦点が当てられていません。
誰かが彼らのニーズを満たすために私たちの突堤の実験を指示できるように、共有/説明できる良いユースケースを持っていますか. 私が想像したのは、RS ベースのデータベース パブリッシャーが、途中で変換のために Flow.Processors を使用して、HTTP 応答または Websocket 接続でオブジェクトを送信することです。
scala - Can I implement my own OverflowStrategy?
Is it possible (or will it be possible in the future) to implement my own OverflowStrategy as a function of the current buffer of the element? Or there's a particular reason to not allow that?
Thanks!
java - Java / Scala - ReactiveStreams / Enumeratee read() が -1 を返すまで継続的に入力ストリームを読み取る方法
現在私は Publisher[InputStream] を持っています Java8 Streams API が提供するようなものを持っていることをどのように知ることができますか?
現在、私は Scala と playframework を使用しており、実験的な akka-streams / reactstreams ライブラリを介して出力をチャンクしたいと考えていますが、両方に関するドキュメントがほとんどまたはまったくないため、手がかりがありません。助言がありますか?
scala - Akka Streams と同期されたフィードバック
私が達成しようとしているのは、akka ストリームを使用した同期フィードバック ループのようなものを実装することです。
を持っているとしましょうFlow[Int].filter(_ % 5 == 0)
。のストリームInt
をこのフローにブロードキャストし、タプルをその直後に圧縮すると、次のような結果が得られます
Option[Int]
次の要素をプッシュした後にフローが要素を放出したかどうかを示すを放出する方法はありますか?
DetachedStage
フローが前Flow
の段階で引っ張られるたびに、私は彼が次の要素を必要とすることを知っていました。背後のステージが要素を受け取らなかった場合、それは None でした。
残念ながら、結果は良くなく、多くの順位がずれています。
補足事項
フィルタ フローは単なる例です。これは非常に長いフローになる可能性があり、そのフローのOption
すべての段階で を発行する機能を提供することはできません。そのため、フローが次をプッシュしたかどうかを実際に知る必要があります。代わりにダウンストリームから次をリクエストしました
と も試してみましたがconflate
、expand
これらは結果の位置オフセットでさらに悪化しています
構成で変更したことの 1 つは、フローのinitial
andmax
バッファーでした。これにより、示された要求が実際にプッシュした要素の後にあることを確認できます。
この問題を解決する方法についていくつかの提案をいただければ幸いです。
scala - パブリッシャーベースのソースは要素を出力しません
次のように、ReactiveStreams Publisher に基づいて Akka Stream の Source を作成しました。
Flickr に検索要求を行い、結果をJsValue
s としてソースします。さまざまなフローとシンクに接続しようとしましたが、これが最も基本的なセットアップになります。
がonNext
数回呼び出され、次にonComplete
. ただし、シンクは何も受け取りません。これはソースを作成する有効な方法ではありませんか?