問題タブ [akka-stream]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - Source[ByteString, Any] を InputStream に変換する方法
akka-http は、multipart/form-data エンコーディングを使用してアップロードされたファイルを として表しますSource[ByteString, Any]
。を期待する Java ライブラリを使用して非整列化する必要がありInputStream
ます。
どのようSource[ByteString, Any]
にに変えることができますInputStream
か?
scala - Stream を使用した Akka-http プロセス リクエスト
requestProcessorストリームでバックプレッシャーを伴う長時間の処理を使用する予定であるため、常に1つのプリコンパイル済みストリームでhttpリクエストを処理する単純なakka-httpおよびakka-streamsベースのアプリケーションを作成してみます
私のアプリケーションコード:
処理する新しいアイテムを動的に受け入れることができるソースを作成する方法についての解決策を見つけましたが、ルートでストリーム実行の結果を取得する方法についての解決策を見つけることができます
scala - Source.actorRef によって作成された akka ストリーム Source の基になる ActorRef へのアクセス
Source.actorRefメソッドを使用してakka.stream.scaladsl.Sourceオブジェクトを作成しようとしています。フォームの何か
私の質問は: ActorRef ベースの Source オブジェクトにデータを送信するにはどうすればよいですか?
ソースへのメッセージの送信は、次のような形式であると想定しました
ただし、演算子またはメソッドweatherSource
はありません。!
tell
ドキュメントには、Source.actorRef の使用方法があまり説明されていません。
あなたのレビューと応答を事前に感謝します。
scala - 構成された max-open-requests を超えました
最近、akka ストリームを使用していくつかの小さな Web 処理サービスを構築し始めました。とても簡単です。redis から URL を取得し、それらの URL (画像) をダウンロードします。後で画像を処理し、それらを s3 にプッシュし、json を redis にプッシュします。
複数のサイトからさまざまな種類の画像をダウンロードしています。404、Unexpected disconnect 、Response Content-Length 17951202 が構成された制限の 8388608 を超えています、EntityStreamException: Entity stream truncation and redirects などのエラーが大量に発生します。リダイレクトを使用して、応答のロケーションヘッダーにあるアドレスで requestWithRedirects を呼び出しています。
ダウンロードを担当する部分は、次のようになります。
TimeoutFuture は非常に単純で、future と timeout が必要です。future がタイムアウトよりも長くかかる場合は、timeout 例外で他の future を返します。私が抱えている問題は、しばらくするとエラーが発生することです:
何が問題なのかはわかりませんが、正しく終了していないダウンロードがいくつかあり、しばらくすると上記のエラーが発生して接続のグローバルプールにとどまると思います。問題の原因となっている可能性のあるアイデアはありますか? または、問題の原因を見つける方法: 既に 404 応答をテストしましたが、Response Content-Length が ... エラーを超えていますが、トラブルメーカーではないようです。
編集:おそらく問題は私のTimeoutFutureにあります。ここhttps://stackoverflow.com/a/29330010/2963977で説明されているようにエラーで埋めていますが、私の意見では、実際にイメージをダウンロードすることは決して完了せず、接続プールのリソースを消費しています。
私の場合、これらの設定が影響を与えないのはなぜだろうか:
EDIT2:
どうやらタイムアウトはまだサポートされていません。これが私のバグレポートです https://github.com/akka/akka/issues/17732#issuecomment-112315953
akka - Akka Stream OnNext は許可されていません
akka ストリームの ActorPublisher の例をたどったところ、ときどき次のメッセージが表示されました。
java.lang.IllegalStateException: ストリームが要素を要求していない場合、onNext は許可されません。totalDemand は 0 でした
ドキュメントを見て、彼らは説明します:
onNext を呼び出して要素をストリームに送信します。ストリーム サブスクライバーから要求された数の要素を送信できます。この金額は、totalDemand で照会できます。isActive で totalDemand>0 の場合にのみ onNext を使用できます。それ以外の場合、onNext は IllegalStateException をスローします。
ストリーム サブスクライバーがさらに多くの要素を要求すると、ActorPublisherMessage.Request メッセージがこのアクターに配信され、そのイベントに対応できます。totalDemand は自動的に更新されます。
totalDemand がゼロにならないようにするにはどうすればよいですか? このエラーが発生したとき、送信しようとしていたメッセージを失いました。
これが私が従ってきた例です:
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html
これは私のクラスのテストです
さて、私は kafka からメッセージを受け取り、WorkerActor に渡していますが、1 秒あたり 10 件のメッセージを Kafka に送信すると、このエラーが原因でメッセージの一部が失われます。
アップデート
ここで説明されているエラーに直面していました(同じライブラリを使用):
https://github.com/softwaremill/reactive-kafka/issues/11
私はバッファを使用して解決しましたが、この PR で問題が解決するようです。
java - アクター呼び出しの結果に応答する方法は?
Akka-HTTP Java API の使用、つまりルーティング DSL の使用を検討しています。
ルーティング機能を使用して HttpRequest に応答する方法は明確ではありません。型指定されていない Akka アクターを使用します。たとえば、Route パスの照合時に、どのようにしてリクエストを「ハンドラ」ActorRef に渡し、非同期で HttpResponse で応答するのでしょうか?
同様の質問が Akka-User メーリング リストに投稿されましたが、フォローアップの解決策はありませんでした - https://groups.google.com/d/msg/akka-user/qHe3Ko7EVvg/KC-aKz_o5aoJ。
scala - メソッド呼び出しを介して後で要素を受け取ることができる Source を作成する方法は?
Source
次のように、後でプッシュ要素を作成したいと思います。
これを行うための推奨される方法は何ですか?
ありがとう!
scala - OverflowStrategyでストリームにドロップされた要素を印刷する方法は?
OverflowStrategy.dropHeadまたはOverflowStrategy.dropTailを使用しているときに、ストリームのドロップされた要素を出力する方法はありますか?
scala - Can I implement my own OverflowStrategy?
Is it possible (or will it be possible in the future) to implement my own OverflowStrategy as a function of the current buffer of the element? Or there's a particular reason to not allow that?
Thanks!