問題タブ [fs2]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - StateT[IO, _, _] の FS2 ストリーム、定期的に状態をダンプ
無限のデータ ストリームを消費するプログラムがあります。途中で、単純な合計と平均であるため、モノイドを形成するいくつかのメトリックを記録したいと思います。定期的に、これらのメトリックをどこかに書き出してクリアし、集計に戻りたいと考えています。私は本質的に持っています:
したがって、実行のほとんどはIO
直接使用し、使用して持ち上げStateT.liftF
ます。また、特定の状況では、 への呼び出しをいくつか含めますrecordMetric
。その最後に、ストリームがあります:
そして、定期的に、たとえば毎分程度、メトリックをダンプしたいので、試しました:
run
そして、開始状態で呼び出すという通常のトップレベルのプログラム処理を行い、次に を呼び出しunsafeRunSync
ます。
問題は、空のメトリックしか表示されないことです! 私のモノイドが暗黙的に空のメトリクスを提供していると思われますがsendStream
、それがなぜなのか、またはそれを修正する方法がよくわかりません。sendMetrics
代わりに、これらの呼び出しをメインストリームに「インターリーブ」できる方法があるでしょうか?
編集:これは最小限の完全な実行可能な例です:
今私がする場合:
次に、期待される結果が得られます-状態が出力に適切に蓄積されます。しかし、もしそうなら:
その後、一貫して空のリストが出力されます。部分的なリスト (約 2 要素) が出力されると予想していました。
scala - 未読にする FS2 ストリーム
その入力ストリームを http フレームワーク (Finch および Akka Http) に渡すことができるように変換fs2.Stream
したいと思います。java.io.InputStream
が見つかりましたfs2.io.toInputStream
が、これは機能しません (何も出力されません)。
私が理解している限り、実行時に.unsafeRunSync()
ストリーム全体を消費しているためSeq[InputStream]
、下にある入力ストリームが返されても、すでに消費されています。
fs2.Stream[IO, Byte]
消費せjava.io.InputStream
ずに変換できる方法はありますか?
タナクス!