scalaz ストリームから外部プログラムにデータを送信し、その項目の結果を 100ms 程度で取得できるようにしたいと考えています。Sink
出力ストリームを入力ストリームで圧縮し、副作用Process
を捨てることで、以下のコードでこれを行うことができましたが、この解決策は非常に脆弱であると感じています。Sink
外部プログラムの入力項目の 1 つにエラーがあると、すべてが同期しなくなります。エラーが発生した場合に再同期できるように、何らかの増分 ID を外部プログラムに送信し、将来的にエコーバックできるようにするのが最善の策だと思います。
私が抱えている主な問題は、データを外部プログラムに送信した結果とプログラムProcess[Task, Unit]
の出力を結合することProcess[Task, String]
です。何かを使用する必要があるように感じますwyn
が、よくわかりません。
import java.io.PrintStream
import scalaz._
import scalaz.concurrent.Task
import scalaz.stream.Process._
import scalaz.stream._
object Main extends App {
/*
# echo.sh just prints to stdout what it gets on stdin
while read line; do
sleep 0.1
echo $line
done
*/
val p: java.lang.Process = Runtime.getRuntime.exec("/path/to/echo.sh")
val source: Process[Task, String] = Process.repeatEval(Task{
Thread.sleep(1000)
System.currentTimeMillis().toString
})
val linesR: stream.Process[Task, String] = stream.io.linesR(p.getInputStream)
val printLines: Sink[Task, String] = stream.io.printLines(new PrintStream(p.getOutputStream))
val in: Process[Task, Unit] = source to printLines
val zip: Process[Task, (Unit, String)] = in.zip(linesR)
val out: Process[Task, String] = zip.map(_._2) observe stream.io.stdOutLines
out.run.run
}