1

受信したイベントに関する統計を蓄積し、完了すると最終的な統計を発行する scalaz-stream チャネルを実装しようとしています。

具体的で簡単な例を挙げるとProcess[Task, String]、各文字列が単語である があるとします。Channel[Task, String, (String, Int)]その初期プロセスに適用すると、それを排出し、各単語の出現回数をカウントし、それを出力したいと思います。

私はこれが折りたたみを通して簡単であることを理解しています:

input.foldMap(w => Map(w -> 1))
     .flatMap(m => Process.emitAll(m.toSeq))
     .maximumBy(_._2)

私が書こうとしているのは、明示的にフォールドするのではなく、プロセスをパイプするだけでよい標準アキュムレータのコレクションです。たとえば、次のように記述します。

input.through(wordFrequency)
     .maximumBy(_._2)

私は少し途方に暮れています-状態を共有せずにそうする方法を理解することはできません. Sinkaに蓄積する a を書くのMap[String, Int]はかなり簡単ですが、マップの最終状態を取得して、シンクが終了した後にそれを出力する方法はありません。

4

0 に答える 0