問題タブ [monix]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
2 に答える
152 参照

scala - 猫で抽象的なコレクションを作成する効率的な方法

ファイルのストリーム処理にMonix を使用するコードがあります。Observableこのコードをテストするには、 で行う操作をObservable型に依存しないようにして、 のような他のデータ構造でも実行できるようにしListます。そのため、基になるデータ構造を抽象化するために次のコードを記述しました。

私を悩ませているのは、このコードが多くの中間データ構造を作成することです。このプロセスをより効率的にするために使用できる型クラスはありますか? LiftIOアイテムのコレクションのように、オーバーヘッドをあまりかけずに、あるデータ構造を別のデータ構造に持ち上げるものはありますか?

0 投票する
1 に答える
359 参照

scala - Monix Observable groupメモリリークのない多数のキーによる

ObservableMonix でキーごとに単一の分割を実行してからn、すべての最後のイベントにグループ化GrouppedObservableし、さらに処理するために送信します。問題は、グループ化するキーの数が無限である可能性があり、メモリ リークが発生することです。

アプリケーションのコンテキスト:

多くの会話からのメッセージを含むカフカ ストリームがあります。各会話にはroomId、この id をグループ化して、それぞれが単一の会話からのメッセージのみを含むオブザーバブルのコレクションを取得したいと考えています。通常、会話ルームは短命です。つまり、一意の で新しい会話が作成され、roomId短期間に数十のメッセージが交換され、その後会話が閉じられます。メモリ リークを回避するために、最新の会話を 100 ~ 1000 だけバッファに保持し、古い会話は破棄したいと考えています。そのため、イベントが長い間見られなかった会話から発生した場合、以前のメッセージを含むバッファーが忘れられるため、新しい会話として扱われます。

MonixkeysBufferの groupBy メソッドには、キー バッファーの処理方法を指定する引数があります。

DropOldストラテジーに指定するkeyBufferことで、私が望んでいた動作を実現できると思いました。

以下は、説明されている使用例の簡略化されたバージョンです。

ただし、VisualVM でアプリケーション ヒープを観察すると、メモリ リークが示されます。30分ほど走った後、java.lang.OutOfMemoryError: GC overhead limit exceeded

以下は、アプリを約 30 分間実行したときのヒープ使用量プロットのスクリーンショットです。(最後に平らにした部分が後OutOfMemoryError)

アプリケーションの VisualVM ヒープ プロット

私の質問は次のとおりです:メモリをリークすることなく、おそらく無限の数のキーで monix のイベントをグループ化するにはどうすればよいですか? 古い鍵はドロップできます

背景情報:

  • モニクスのバージョン:3.0.0-RC2
  • スカラバージョン:2.12.8
0 投票する
1 に答える
120 参照

scala - Monix TaskLocal は、別のレキシカル スコープから Task by によって参照された場合、bind() 呼び出しからの値を反映していないようです。

TaskLocal の ScalaDoc にあるコード スニペットをわずかに変更したコード ブロックがあります [オリジナルはこちら: https://monix.io/api/3.0/monix/eval/TaskLocal.html ]。

私のコードは、インライン タスク (for 内包表記内) を、for 内包表記の範囲外で定義された Task への参照に置き換えるだけです。残念ながら、私のタスクを実行すると、バインディングは表示されず、代わりに元の値 (0) が表示されます。少し変更したコード (以下) が ScalaDoc のスニペットと同じ結果になることを期待していました。しかし、予期せず、value3: 200 ではなく、"value3: 4" を取得します。tlocal への参照は、デフォルトに初期化され、バインディングについて何も知らない、そのスレッドローカルっぽい変数の別の「バージョン」を取得しているようです。 .

コード

根拠:

これを行う理由は、値を自分のスレッド ローカルにバインドし、タスクのマップ チェーンを作成して、これらのタスクの一部がスレッド ローカルの現在の値を取得するためです。そして、明確にするために、これらのタスクのいくつかは、私のコードベースの他のクラスで定義されており、理解のために特定の範囲内にインライン化されていません。

ありがとう !/クリス