問題タブ [akka-stream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
2 に答える
829 参照

scala - 無限 akka ストリームを終了する方法

私は Akka Streams を初めて使用しますが、それを使用して無限のソースからの順列を探したい場合があります。有限のソースを使用した単純化された例は、次のようになります。

この例の出力は次のとおりです。

ソースが通過しても問題ないことは明らかです42が、結果を取得する前にストリーム全体を使い果たしたくありません。

質問は、探しているものを見つけたときにストリームを終了するにはどうすればよいですか?

0 投票する
2 に答える
6261 参照

scala - akka http: 残りのサービスを構築するための Akka ストリーム vs アクター

akka http で 60 以上の API を使用して REST Web サービスを作成する場合。akka ストリームと akka アクターのどちらを使用するかを選択するにはどうすればよいですか? Jos は彼の投稿で、akka http で API を作成する 2 つの方法を示していますが、いつどちらを選択すべきかについては示していません。

0 投票する
1 に答える
1073 参照

scala - akka ストリームとスプレーを使用して csv ファイルをブラウザーにストリーミングする

Source[String, Unit]をストリーミング アクターに接続するにはどうすればよいですか?

https://gist.github.com/whysoserious/96050c6b4bd5fedb6e33StreamingActorの修正版はうまく機能すると思いますが、ピースを接続するのに苦労しています。

と が与えられた場合、変更されたは と接続する必要があるsource: Source[String, Unit]ctx: RequestContext思います。StreamingActoractorRefFactory.actorOf(fromSource(source, ctx))

参考までに、上記の要点:

0 投票する
1 に答える
1479 参照

debugging - akka ストリーム フローをデバッグするには?

メソッド processLine のどこかにブレークポイントをドロップすると、デバッガーはその行で停止しません。ブレークポイントがないかのように実行されます。akka ストリーム フローのデバッグは多少異なりますが、この問題を解決するにはどうすればよいですか?

0 投票する
2 に答える
5395 参照

akka - akka ストリームを使用して条件付きでフローをスキップする

私は akka ストリームを使用しており、フローが特定の値を処理できないため、条件付きでスキップする必要があるグラフのセグメントがあります。具体的には、文字列を受け取って http リクエストを行うフローがありますが、文字列が空の場合、サーバーはケースを処理できません。しかし、代わりに空の文字列を返す必要があります。失敗することを知っているhttpリクエストを実行する必要なく、これを行う方法はありますか? 私は基本的にこれを持っています:

私が考えられる唯一のことは、httpResponse フローで 400 エラーをキャッチし、デフォルト値を返すことです。しかし、事前に失敗することがわかっているリクエストに対してサーバーにアクセスするオーバーヘッドを回避できるようにしたいと考えています。

0 投票する
3 に答える
1713 参照

scala - 無限ストリームからの着信イベントをグループ化する方法は?

私はイベントの無限の流れを持っています:

すなわち

これらのイベントを session_uid でグループ化し、各セッションのトラフィックの合計を計算します。

akka-streams私は有限ストリームの使用で問題なく動作するフローを書きました(クックブックのこのgroupByに基づいた私のコード ベース)。ただし、無限ストリームでは機能しません。関数はすべての受信ストリームを処理し、その後でのみ結果を返す準備ができるためです。groupBy

タイムアウト付きのグループ化を実装する必要があると思います。つまり、指定された stream_uid のイベントを最後から 5 分以上受信しない場合は、この session_uid のグループ化されたイベントを返す必要があります。しかし、それを実装する方法akka-streamsは?

0 投票する
0 に答える
1043 参照

scala - groupBy は akka-stream でリークしていますか?

akka-streamsession_uid によって無限ストリームからのイベントをグループ化するためのフローを記述し、各セッションのトラフィックの合計を計算したいと考えています(詳細は前の質問で)。

session_uid によるグループ イベントの関数を使用する予定ですSource#groupByが、この関数はすべてのグループ キーを内部に蓄積し、それらを解放する方法がないようです。これによりjava.lang.OutOfMemoryError: Java heap space例外が発生します。これを再現するためのコードは次のとおりです。

では、関連するイベント ストリーム ( ) の処理が完了した後、sessionUid内部でグループ化キー ( ) を解放するにはどうすればよいでしょうか。groupBysessionEvents

session_uid ベースでイベントをグループ化する別の方法を知っている人はいakka-streamますか?

0 投票する
0 に答える
447 参照

scala - ソースを作成して要素を手動でプッシュする方法は?

メソッドのように機能し、要素を発行するカスタムStatefulStageを作成したいのですが、インスタンスを作成して着信要素をプッシュする方法がわかりません。スタブは次のとおりです。groupBySource[A, Unit]Source[A, Unit]

次のスニペットを GroupBy フローのテストに使用できます (作成されたストリームからイベントを出力する必要があります)。

誰も私にそれを行う方法を説明できますか?

アップデート:

この回答に基づいて次のonPushメソッドを作成しましたが、イベントは出力されませんでした。私が理解しているように、フローの一部として実行されている場合にのみ要素をソースにプッシュできますが、テスト スニペットの外でフローを実行したいと考えています。この例のようにフローを実行すると、イベントが処理されて に送信されます。これが、テスト スニペットがイベントを出力しなかった理由だと思います。GroupByGroupBySink.ignore

それで、それを修正する方法は?

0 投票する
1 に答える
170 参照

scala - 成長するリストのための scalaz ストリーム構造

このような問題を解決するために、scalaz-streams を使用できる (すべきか?) という予感があります。

開始項目 A があります。A を受け取り、A のリストを返す関数があります。

1 つのアイテム (開始アイテム) で始まるワーク キューがあります。各アイテムを処理 ( doSomething) すると、同じワーク キューの最後に多くのアイテムが追加される場合があります。ただし、ある時点 (何百万ものアイテムの後) に、後続の各アイテムdoSomethingが作業キューに追加するアイテムの数が減り始め、最終的には新しいアイテムが追加されなくなります (doSomething はこれらのアイテムに対して Nil を返します)。これにより、計算が最終的に終了することがわかります。

scalaz-streams がこれに適していると仮定すると、これを実装するためにどの全体的な構造または型を調べる必要があるかについてのヒントを教えてください。

単一の「ワーカー」を使用した単純な実装が完了したら、複数のワーカーを使用してキュー アイテムを並行して処理したいと考えていますdoSomething。そのため、このアルゴリズムでも効果 (ワーカーの失敗など) を処理する必要があります。