2

scalaz ストリームから外部プログラムにデータを送信し、その項目の結果を 100ms 程度で取得できるようにしたいと考えています。Sink出力ストリームを入力ストリームで圧縮し、副作用Processを捨てることで、以下のコードでこれを行うことができましたが、この解決策は非常に脆弱であると感じています。Sink

外部プログラムの入力項目の 1 つにエラーがあると、すべてが同期しなくなります。エラーが発生した場合に再同期できるように、何らかの増分 ID を外部プログラムに送信し、将来的にエコーバックできるようにするのが最善の策だと思います。

私が抱えている主な問題は、データを外部プログラムに送信した結果とプログラムProcess[Task, Unit]の出力を結合することProcess[Task, String]です。何かを使用する必要があるように感じますwynが、よくわかりません。

import java.io.PrintStream
import scalaz._
import scalaz.concurrent.Task
import scalaz.stream.Process._
import scalaz.stream._

object Main extends App {
/*
  # echo.sh just prints to stdout what it gets on stdin
  while read line; do
    sleep 0.1
    echo $line
  done
*/
  val p: java.lang.Process = Runtime.getRuntime.exec("/path/to/echo.sh")

  val source: Process[Task, String] = Process.repeatEval(Task{
     Thread.sleep(1000)
     System.currentTimeMillis().toString
  })

  val linesR: stream.Process[Task, String] = stream.io.linesR(p.getInputStream)
  val printLines: Sink[Task, String] = stream.io.printLines(new PrintStream(p.getOutputStream))

  val in: Process[Task, Unit] = source to printLines

  val zip: Process[Task, (Unit, String)] = in.zip(linesR)
  val out: Process[Task, String] = zip.map(_._2) observe stream.io.stdOutLines
  out.run.run
}
4

1 に答える 1

0

より高度なタイプをもう少し深く掘り下げた後。Exchangeそれはまさに私が望むことをするように見えます。

import java.io.PrintStream

import scalaz._
import scalaz.concurrent.Task
import scalaz.stream._
import scalaz.stream.io._

object Main extends App {
/*
  # echo.sh just prints to stdout what it gets on stdin
  while read line; do
    sleep 0.1
    echo $line
  done
*/
  val program: java.lang.Process = Runtime.getRuntime.exec("./echo.sh")

  val source: Process[Task, String] = Process.repeatEval(Task{
     Thread.sleep(100)
     System.currentTimeMillis().toString
  })

  val read: stream.Process[Task, String] = linesR(program.getInputStream)
  val write: Sink[Task, String] = printLines(new PrintStream(program.getOutputStream))
  val exchange: Exchange[String, String] = Exchange(read, write)
  println(exchange.run(source).take(10).runLog.run)
}
于 2015-06-02T07:47:38.500 に答える