12

次のプログラムがあります。

package main

import "bytes"
import "io"
import "log"
import "os"
import "os/exec"
import "time"

func main() {
    runCatFromStdinWorks(populateStdin("aaa\n"))
    runCatFromStdinWorks(populateStdin("bbb\n"))
}

func populateStdin(str string) func(io.WriteCloser) {
    return func(stdin io.WriteCloser) {
        defer stdin.Close()
        io.Copy(stdin, bytes.NewBufferString(str))
    }
}

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) {
    cmd := exec.Command("cat")
    stdin, err := cmd.StdinPipe()
    if err != nil {
        log.Panic(err)
    }
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        log.Panic(err)
    }
    err = cmd.Start()
    if err != nil {
        log.Panic(err)
    }
    go populate_stdin_func(stdin)
    go func() {
            // Removing the following lines allow some output
            // to be fetched from cat's stdout sometimes
            time.Sleep(5 * time.Second)
            io.Copy(os.Stdout, stdout)
    }()
    err = cmd.Wait()
    if err != nil {
        log.Panic(err)
    }
}

ループで実行すると、次のように結果が得られません。

$ while true; do go run cat_thingy.go; echo ; done



^C

この結果は、仮想マシン (go バージョン go1) の apt から Ubuntu 12.04 に golang-go をインストールした後に得られます。Macbook Air (go バージョン go1.0.3) で go インストールを複製できませんでした。ある種の競合状態のようです。実際、sleep(1*time.Second) を設定すると、コードでランダムなスリープを犠牲にして問題が発生することはありません。

コードで間違っていることがありますか、それともバグですか? バグなら修正された?

更新: 考えられる手がかり

Command.Wait は、まだ未読のデータがある場合でも、cat サブプロセスとの間で通信するためのパイプを閉じることがわかりました。それを処理する適切な方法についてはよくわかりません。標準入力への書き込みが完了したときに通知するチャネルを作成できると思いますが、標準出力パイプに他に何も書き込まれないことを確認するために、cat プロセスが終了したかどうかを知る必要があります。cmd.Process.Wait を使用してプロセスがいつ終了するかを判断できることはわかっていますが、cmd.Wait を呼び出しても安全ですか?

更新: 近づく

これがコードの新しいカットです。これは、標準入力への書き込みと標準出力からの読み取りに関しては機能すると思います。ストリーミングするものなしで標準出力処理ゴルーチンから io.Copy を置き換えると、(すべてをバッファリングするのではなく) データを適切にストリーミングできると思います。

package main

import "bytes"
import "fmt"
import "io"
import "log"
import "os/exec"
import "runtime"

const inputBufferBlockLength = 3*64*(2<<10) // enough to be bigger than 2x the pipe buffer of 64KiB
const numInputBlocks = 6

func main() {
    runtime.GOMAXPROCS(5)
    runCatFromStdin(populateStdin(numInputBlocks))
}

func populateStdin(numInputBlocks int) func(io.WriteCloser, chan bool) {
    return func(stdin io.WriteCloser) {
        defer stdin.Close()
        repeatedByteBases := []string{"a", "b", "c", "d", "e", "f"}
        for i := 0; i < numInputBlocks; i++ {
          repeatedBytes := bytes.NewBufferString(repeatedByteBases[i]).Bytes()
          fmt.Printf("%s\n", repeatedBytes)
          io.Copy(stdin, bytes.NewReader(bytes.Repeat(repeatedBytes, inputBufferBlockLength)))
        }
    }
}

func runCatFromStdin(populate_stdin_func func(io.WriteCloser)) {
    cmd := exec.Command("cat")
    stdin, err := cmd.StdinPipe()
    if err != nil {
        log.Panic(err)
    }
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        log.Panic(err)
    }
    err = cmd.Start()
    if err != nil {
        log.Panic(err)
    }
    go populate_stdin_func(stdin)
    output_done_channel := make(chan bool)
    go func() {
        out_bytes := new(bytes.Buffer)
        io.Copy(out_bytes, stdout)
        fmt.Printf("%s\n", out_bytes)
        fmt.Println(out_bytes.Len())
        fmt.Println(inputBufferBlockLength*numInputBlocks)
        output_done_channel <- true
    }()
    <-output_done_channel
    err = cmd.Wait()
    if err != nil {
        log.Panic(err)
    }
}
4

2 に答える 2

5

これが機能する最初のコードのバージョンです。コマンドを閉じる前に、goルーチンの送信と受信を確実に終了するために、sync.WaitGroupが追加されていることに注意してください。

package main

import (
    "bytes"
    "io"
    "log"
    "os"
    "os/exec"
    "sync"
    "time"
)

func main() {
    runCatFromStdinWorks(populateStdin("aaa\n"))
    runCatFromStdinWorks(populateStdin("bbb\n"))
}

func populateStdin(str string) func(io.WriteCloser) {
    return func(stdin io.WriteCloser) {
        defer stdin.Close()
        io.Copy(stdin, bytes.NewBufferString(str))
    }
}

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) {
    cmd := exec.Command("cat")
    stdin, err := cmd.StdinPipe()
    if err != nil {
        log.Panic(err)
    }
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        log.Panic(err)
    }
    err = cmd.Start()
    if err != nil {
        log.Panic(err)
    }
    var wg sync.WaitGroup
    wg.Add(2)
    go func() {
        defer wg.Done()
        populate_stdin_func(stdin)
    }()
    go func() {
        defer wg.Done()
        time.Sleep(5 * time.Second)
        io.Copy(os.Stdout, stdout)
    }()
    wg.Wait()
    err = cmd.Wait()
    if err != nil {
        log.Panic(err)
    }
}

(これは@peterSOが言ったことを言う別の方法です;-)

于 2013-02-11T18:41:57.560 に答える
0

Go ステートメント

「go」ステートメントは、関数またはメソッド呼び出しの実行を、同じアドレス空間内で制御の独立した並行スレッドまたはゴルーチンとして開始します。

GoStmt = "go" 式 .

式は呼び出しでなければなりません。関数の値とパラメーターは、呼び出し元のゴルーチンで通常どおり評価されますが、通常の呼び出しとは異なり、プログラムの実行は、呼び出された関数が完了するまで待機しません。代わりに、関数は新しいゴルーチンで独立して実行を開始します。関数が終了すると、ゴルーチンも終了します。関数に戻り値がある場合、それらは関数の完了時に破棄されます。

不要なゴルーチンを関数呼び出しに変換します。

package main

import (
    "bytes"
    "io"
    "log"
    "os"
    "os/exec"
)

func main() {
    runCatFromStdinWorks(populateStdin("aaa\n"))
    runCatFromStdinWorks(populateStdin("bbb\n"))
}

func populateStdin(str string) func(io.WriteCloser) {
    return func(stdin io.WriteCloser) {
        defer stdin.Close()
        io.Copy(stdin, bytes.NewBufferString(str))
    }
}

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) {
    cmd := exec.Command("cat")
    stdin, err := cmd.StdinPipe()
    if err != nil {
        log.Panic(err)
    }
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        log.Panic(err)
    }
    err = cmd.Start()
    if err != nil {
        log.Panic(err)
    }
    populate_stdin_func(stdin)
    io.Copy(os.Stdout, stdout)
    err = cmd.Wait()
    if err != nil {
        log.Panic(err)
    }
}
于 2013-02-10T22:49:40.573 に答える