次のプログラムがあります。
package main
import "bytes"
import "io"
import "log"
import "os"
import "os/exec"
import "time"
func main() {
runCatFromStdinWorks(populateStdin("aaa\n"))
runCatFromStdinWorks(populateStdin("bbb\n"))
}
func populateStdin(str string) func(io.WriteCloser) {
return func(stdin io.WriteCloser) {
defer stdin.Close()
io.Copy(stdin, bytes.NewBufferString(str))
}
}
func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) {
cmd := exec.Command("cat")
stdin, err := cmd.StdinPipe()
if err != nil {
log.Panic(err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Panic(err)
}
err = cmd.Start()
if err != nil {
log.Panic(err)
}
go populate_stdin_func(stdin)
go func() {
// Removing the following lines allow some output
// to be fetched from cat's stdout sometimes
time.Sleep(5 * time.Second)
io.Copy(os.Stdout, stdout)
}()
err = cmd.Wait()
if err != nil {
log.Panic(err)
}
}
ループで実行すると、次のように結果が得られません。
$ while true; do go run cat_thingy.go; echo ; done
^C
この結果は、仮想マシン (go バージョン go1) の apt から Ubuntu 12.04 に golang-go をインストールした後に得られます。Macbook Air (go バージョン go1.0.3) で go インストールを複製できませんでした。ある種の競合状態のようです。実際、sleep(1*time.Second) を設定すると、コードでランダムなスリープを犠牲にして問題が発生することはありません。
コードで間違っていることがありますか、それともバグですか? バグなら修正された?
更新: 考えられる手がかり
Command.Wait は、まだ未読のデータがある場合でも、cat サブプロセスとの間で通信するためのパイプを閉じることがわかりました。それを処理する適切な方法についてはよくわかりません。標準入力への書き込みが完了したときに通知するチャネルを作成できると思いますが、標準出力パイプに他に何も書き込まれないことを確認するために、cat プロセスが終了したかどうかを知る必要があります。cmd.Process.Wait を使用してプロセスがいつ終了するかを判断できることはわかっていますが、cmd.Wait を呼び出しても安全ですか?
更新: 近づく
これがコードの新しいカットです。これは、標準入力への書き込みと標準出力からの読み取りに関しては機能すると思います。ストリーミングするものなしで標準出力処理ゴルーチンから io.Copy を置き換えると、(すべてをバッファリングするのではなく) データを適切にストリーミングできると思います。
package main
import "bytes"
import "fmt"
import "io"
import "log"
import "os/exec"
import "runtime"
const inputBufferBlockLength = 3*64*(2<<10) // enough to be bigger than 2x the pipe buffer of 64KiB
const numInputBlocks = 6
func main() {
runtime.GOMAXPROCS(5)
runCatFromStdin(populateStdin(numInputBlocks))
}
func populateStdin(numInputBlocks int) func(io.WriteCloser, chan bool) {
return func(stdin io.WriteCloser) {
defer stdin.Close()
repeatedByteBases := []string{"a", "b", "c", "d", "e", "f"}
for i := 0; i < numInputBlocks; i++ {
repeatedBytes := bytes.NewBufferString(repeatedByteBases[i]).Bytes()
fmt.Printf("%s\n", repeatedBytes)
io.Copy(stdin, bytes.NewReader(bytes.Repeat(repeatedBytes, inputBufferBlockLength)))
}
}
}
func runCatFromStdin(populate_stdin_func func(io.WriteCloser)) {
cmd := exec.Command("cat")
stdin, err := cmd.StdinPipe()
if err != nil {
log.Panic(err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Panic(err)
}
err = cmd.Start()
if err != nil {
log.Panic(err)
}
go populate_stdin_func(stdin)
output_done_channel := make(chan bool)
go func() {
out_bytes := new(bytes.Buffer)
io.Copy(out_bytes, stdout)
fmt.Printf("%s\n", out_bytes)
fmt.Println(out_bytes.Len())
fmt.Println(inputBufferBlockLength*numInputBlocks)
output_done_channel <- true
}()
<-output_done_channel
err = cmd.Wait()
if err != nil {
log.Panic(err)
}
}