問題タブ [akka-stream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
3 に答える
15084 参照

scala - Source[ByteString, Any] を InputStream に変換する方法

akka-http は、multipart/form-data エンコーディングを使用してアップロードされたファイルを として表しますSource[ByteString, Any]。を期待する Java ライブラリを使用して非整列化する必要がありInputStreamます。

どのようSource[ByteString, Any]にに変えることができますInputStreamか?

0 投票する
1 に答える
1822 参照

scala - Stream を使用した Akka-http プロセス リクエスト

requestProcessorストリームでバックプレッシャーを伴う長時間の処理を使用する予定であるため、常に1つのプリコンパイル済みストリームでhttpリクエストを処理する単純なakka-httpおよびakka-streamsベースのアプリケーションを作成してみます

私のアプリケーションコード:

処理する新しいアイテムを動的に受け入れることができるソースを作成する方法についての解決策を見つけましたが、ルートでストリーム実行の結果を取得する方法についての解決策を見つけることができます

0 投票する
3 に答える
8236 参照

scala - Source.actorRef によって作成された akka ストリーム Source の基になる ActorRef へのアクセス

Source.actorRefメソッドを使用してakka.stream.scaladsl.Sourceオブジェクトを作成しようとしています。フォームの何か

私の質問は: ActorRef ベースの Source オブジェクトにデータを送信するにはどうすればよいですか?

ソースへのメッセージの送信は、次のような形式であると想定しました

ただし、演​​算子またはメソッドweatherSourceはありません。!tell

ドキュメントには、Source.actorRef の使用方法があまり説明されていません。

あなたのレビューと応答を事前に感謝します。

0 投票する
0 に答える
1355 参照

scala - 構成された max-open-requests を超えました

最近、akka ストリームを使用していくつかの小さな Web 処理サービスを構築し始めました。とても簡単です。redis から URL を取得し、それらの URL (画像) をダウンロードします。後で画像を処理し、それらを s3 にプッシュし、json を redis にプッシュします。

複数のサイトからさまざまな種類の画像をダウンロードしています。404、Unexpected disconnect 、Response Content-Length 17951202 が構成された制限の 8388608 を超えています、EntityStreamException: Entity stream truncation and redirects などのエラーが大量に発生します。リダイレクトを使用して、応答のロケーションヘッダーにあるアドレスで requestWithRedirects を呼び出しています。

ダウンロードを担当する部分は、次のようになります。

TimeoutFuture は非常に単純で、future と timeout が必要です。future がタイムアウトよりも長くかかる場合は、timeout 例外で他の future を返します。私が抱えている問題は、しばらくするとエラーが発生することです:

何が問題なのかはわかりませんが、正しく終了していないダウンロードがいくつかあり、しばらくすると上記のエラーが発生して接続のグローバルプールにとどまると思います。問題の原因となっている可能性のあるアイデアはありますか? または、問題の原因を見つける方法: 既に 404 応答をテストしましたが、Response Content-Length が ... エラーを超えていますが、トラブルメーカーではないようです。

編集:おそらく問題は私のTimeoutFutureにあります。ここhttps://stackoverflow.com/a/29330010/2963977で説明されているようにエラーで埋めていますが、私の意見では、実際にイメージをダウンロードすることは決して完了せず、接続プールのリソースを消費しています。

私の場合、これらの設定が影響を与えないのはなぜだろうか:

EDIT2:

どうやらタイムアウトはまだサポートされていません。これが私のバグレポートです https://github.com/akka/akka/issues/17732#issuecomment-112315953

0 投票する
1 に答える
815 参照

akka - Akka Stream OnNext は許可されていません

akka ストリームの ActorPublisher の例をたどったところ、ときどき次のメッセージが表示されました。

java.lang.IllegalStateException: ストリームが要素を要求していない場合、onNext は許可されません。totalDemand は 0 でした

ドキュメントを見て、彼らは説明します:

onNext を呼び出して要素をストリームに送信します。ストリーム サブスクライバーから要求された数の要素を送信できます。この金額は、totalDemand で照会できます。isActive で totalDemand>0 の場合にのみ onNext を使用できます。それ以外の場合、onNext は IllegalStateException をスローします。

ストリーム サブスクライバーがさらに多くの要素を要求すると、ActorPublisherMessage.Request メッセージがこのアクターに配信され、そのイベントに対応できます。totalDemand は自動的に更新されます。

totalDemand がゼロにならないようにするにはどうすればよいですか? このエラーが発生したとき、送信しようとしていたメッセージを失いました。

これが私が従ってきた例です:

http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html

これは私のクラスのテストです

さて、私は kafka からメッセージを受け取り、WorkerActor に渡していますが、1 秒あたり 10 件のメッセージを Kafka に送信すると、このエラーが原因でメッセージの一部が失われます。

アップデート

ここで説明されているエラーに直面していました(同じライブラリを使用):

https://github.com/softwaremill/reactive-kafka/issues/11

私はバッファを使用して解決しましたが、この PR で問題が解決するようです。

https://github.com/softwaremill/reactive-kafka/pull/13

0 投票する
2 に答える
1909 参照

java - アクター呼び出しの結果に応答する方法は?

Akka-HTTP Java API の使用、つまりルーティング DSL の使用を検討しています。

ルーティング機能を使用して HttpRequest に応答する方法は明確ではありません。型指定されていない Akka アクターを使用します。たとえば、Route パスの照合時に、どのようにしてリクエストを「ハンドラ」ActorRef に渡し、非同期で HttpResponse で応答するのでしょうか?

同様の質問が Akka-User メーリング リストに投稿されましたが、フォローアップの解決策はありませんでした - https://groups.google.com/d/msg/akka-user/qHe3Ko7EVvg/KC-aKz_o5aoJ

0 投票する
3 に答える
16608 参照

scala - メソッド呼び出しを介して後で要素を受け取ることができる Source を作成する方法は?

Source次のように、後でプッシュ要素を作成したいと思います。

これを行うための推奨される方法は何ですか?

ありがとう!

0 投票する
1 に答える
139 参照

scala - OverflowStrategyでストリームにドロップされた要素を印刷する方法は?

OverflowStrategy.dropHeadまたはOverflowStrategy.dropTailを使用しているときに、ストリームのドロップされた要素を出力する方法はありますか?

0 投票する
1 に答える
71 参照

scala - Can I implement my own OverflowStrategy?

Is it possible (or will it be possible in the future) to implement my own OverflowStrategy as a function of the current buffer of the element? Or there's a particular reason to not allow that?

Thanks!