問題タブ [scalaz-stream]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - scalazストリームを使用して外部プロセスにどのように書き込み、外部プロセスから読み取るのですか?
scalaz ストリームから外部プログラムにデータを送信し、その項目の結果を 100ms 程度で取得できるようにしたいと考えています。Sink
出力ストリームを入力ストリームで圧縮し、副作用Process
を捨てることで、以下のコードでこれを行うことができましたが、この解決策は非常に脆弱であると感じています。Sink
外部プログラムの入力項目の 1 つにエラーがあると、すべてが同期しなくなります。エラーが発生した場合に再同期できるように、何らかの増分 ID を外部プログラムに送信し、将来的にエコーバックできるようにするのが最善の策だと思います。
私が抱えている主な問題は、データを外部プログラムに送信した結果とプログラムProcess[Task, Unit]
の出力を結合することProcess[Task, String]
です。何かを使用する必要があるように感じますwyn
が、よくわかりません。
scala - scalaz.stream でデータベースの結果を継続的に取得する
私はscalaに不慣れで、scalazに非常に慣れていません。別のスタックオーバーフローの回答といくつかのハンドホールディングにより、scalaz.stream を使用して、Twitter API の結果を継続的にフェッチするプロセスを実装することができました。ここで、Twitter ハンドルが格納されている Cassandra DB に対して同じことを行いたいと思います。
Twitter の結果を取得するためのコードは次のとおりです。
私がやろうとしていることは、
Cassandra の結果を (awakeEvery を使用して) 継続的にフェッチし、アクターに送信して上記の twitter フェッチ コードを実行します。
私の質問は、これを scalaz.stream で実装する最良の方法は何ですか? すべてのデータベース結果を取得してから、すべてのデータベース結果を再度取得する前に遅延が必要であることに注意してください。上記の Twitter フェッチ コードと同じアーキテクチャを使用する必要がありますか? もしそうなら、入力を必要としない channel.lift を作成するにはどうすればよいですか? scalaz.stream でもっと良い方法はありますか?
前もって感謝します
scala - プロセスが中断されたときにコンソール入力を強制終了する方法
私は Scalaz Stream ライブラリで遊んでいて、単純なコンソール アプリを作成しようとしています。
チュートリアルのscalaz ストリームに従いましたが、コンソールの読み取りと書き込みの例があります。
しかし、どうすればいいのかわからないという奇妙な問題に直面しました。
これは私のコードです:
実行すると、出力は次のようになります
1 は問題なく、2 は 5 秒後にタイムアウトになり、3 は 2 回入力する必要があります。
問題は、プロセスが時間通りに失敗した場合、コンソール入力のブロックが解除されず (強制終了されず)、ユーザーの入力を待機していることです。
私は先物でそれを再現しようとしましたが、同じ問題に直面しました。
ありがとうございました。
scalaz-stream - scalaz-stream のメモリ効率
github ページの README の最初の例を考えてみましょう:
メモリ使用量の点でどれくらい効率的ですか? 結果を次のステップに渡す前に、各ステップでコンテンツ全体をメモリにバッファリングしますか? それとも、ストリーミング方式で行われますか? つまり、一定のメモリ使用量になりますか?
また、scalaz-stream は本番環境での使用を検討できる高品質のライブラリですか?
scala - scalaz-stream で wye.mergeLeftBiased を実装するにはどうすればよいですか?
次の関数を scalaz-stream で実装するのに問題があります:
私はそれを実装しようとしているブランチを持っていますが、残念ながら単純なケースでも機能します(テストケースで実証されているように)。in scalaz-streamの実装はwye
(少なくとも私にとっては) 簡単に読めるものではありません。
https://github.com/jedesah/scalaz-stream/tree/topic/mergeLeftBiased
scalaz-stream - scalaz-stream の非同期「ノード」
があり、各ストリームで実行時間が瞬時から非常に長いProcess[Task, A]
関数を実行して、 を生成する必要があります。A => B
A
Process[Task, B]
問題は、 s が受信された順序に関係なく、それぞれA
を an でできるだけ早く処理し、結果が得られたらすぐに渡したいということです。ExecutionContext
A
具体的な例は次のコードです。ここでは、すべての奇数がすぐに出力され、偶数が約 500 ミリ秒後に出力されることを願っています。代わりに発生するのは、(奇数、偶数) カップルが出力され、500 ミリ秒の一時停止でインターリーブされることです。
scala - Scalaz-stream チャンキング UP to N
次のようなキューが与えられます。
このキューを取り出して、最大 100 のチャンクでダウンストリーム シンクにストリーミングしたいと考えています。
ある程度は機能しますが、メッセージが101個ある場合、キューは空になりません。別の 99 がプッシュされない限り、1 つのメッセージが残ります。ダウンストリーム プロセスが処理できる速さで、最大 100 まで、できるだけ多くのメッセージをキューから取り出したいと考えています。
利用可能な既存のコンビネータはありますか?
scala - nondeterminism.njoin: maxQueued とプリフェッチ
njoin
処理前にデータをプリフェッチするのはなぜですか? プロセスのプロセスがどのようにマージされるかに関係がない限り、それは不必要な複雑さのように思えますか?
新しい要素が生成されるたびにエフェクトを実行するストリームがあります。私は効果を最小限に抑えたいのでnjoin
、たとえばmaxOpen = 4
4 を同時に生成する要素の最大数にする必要があります (すぐに処理できない限り、要素は生成されません)。
これを優雅に解決する方法はありnjoin
ますか?現在、「チケット」の制限付きキューを使用しています (要素は、チケットを取得した後にのみ生成されます)。
scalaz - チャネルを初期値から N 回適用する
関数f
とチャンネルがありますc
f
前の計算の出力に関数を任意の回数 (または無期限に)継続的に適用したいと思います。私は初期値を提供しています。
プロセスを定義できるp
ただし、これは一度だけ実行されます。
c
最後の計算の出力に適用し続けるにはどうすればよいですか?