0

だから私はいくつかのパイプラインを持っています。2 つのファイルを行ごとに比較し、データベースへのロードなどを行う必要があります。単一行を処理するためのタイムアウトを設定し、到達した場合はパイプラインを中断したいと考えています。エンドポイントに入るのは functionRun()です。ここでは、いくつかの検証を行い、コマンドで構造を作成します

その後、fuctionrun()で、単一の goroutine で stdout の処理を​​開始commし、ubuntu でメイン プロセス コマンドを実行します。まあ、それは正常に動作し、私はそれをレースにチェックしました: go build --race, go run --race. 出力に問題はありません。しかし、単純なロジックをチェックするテストがあります。ほぼ問題なく動作しますが、10 回実行する前に 1 回失敗します。これを調査した後、このテストで競合状態が見つかりました。誰かが説明できますか、何がうまくいかないのですか?

入る

func (cmWk *CommWorker) Run(path1, path2, category, subcategory string) error {
    err := cmWk.opt.validate()
    if err != nil {
        return err
    }
    if notExistFile(path1) {
        return &AESCommError{fmt.Sprintf(
            "Check your file path [%s].It is invalid", path1),
        }
    }
    if isDir(path1) {
        return &AESCommError{fmt.Sprintf(
            "Check your file path [%s].It is invalid", path1),
        }
    }
    // If path2 does not exist, we create this file
    if notExistFile(path2) {
        err := createIfNotExist(path2)
        if err != nil {
            return err
        }
    }
    cmWk.category = category
    cmWk.subcategory = subcategory
    cmWk.aesComm = NewAESComm(path1, path2, cmWk.opt)
    return cmWk.run()
}
// NewAESComm returns AESComm structure with described options
func NewAESComm(path1, path2 string, options *AESCommOptions) *AESComm {
    arguments := append(getArguments(options), path1, path2)
    return &AESComm{
        FilePath1: path1,
        FilePath2: path2,
        cmd:       exec.Command("comm", arguments...),
    }
}
// Run starts comm process with options
func (cmWk *CommWorker) run() error {
    stdout, err := cmWk.aesComm.StdoutPipe()
    if err != nil {
        return err
    }
    defer stdout.Close()

    go cmWk.processStdout(stdout)

    return cmWk.aesComm.Run()
}
// Run start comm task
func (comm AESComm) Run() error {
    if err := comm.cmd.Start(); err != nil {
        return err
    }
    return comm.cmd.Wait()
}
func (cmWk *CommWorker) processStdout(stdout io.ReadCloser) {
    scanner := bufio.NewScanner(stdout)
    timer := time.NewTimer(cmWk.pTimeout)
    erCh := make(chan error, 1)
    for scanner.Scan() {
        line := scanner.Text()
        if strings.HasPrefix(line, "#") {
            continue
        }
        if err := cmWk.process(line, timer, erCh); err != nil {
            cmWk.interrupt(err)
            return
        }

        timer.Reset(cmWk.pTimeout)
    }
    if err := cmWk.clear(erCh); err != nil {
        cmWk.interrupt(err)
    }
}
func (cmWk *CommWorker) process(line string, timer *time.Timer, erCh chan error) error {
    erCh <- cmWk.processor.ProcessLine(line, cmWk.category, cmWk.subcategory)
    select {
    case <-timer.C:
        return fmt.Errorf("got timeout error handling line")
    default:
        return <-erCh
    }
}
func (cmWk *CommWorker) interrupt(err error) {
    cmWk.Error = err
    _ = cmWk.aesComm.cmd.Process.Signal(signal)
}

壊れたテスト

func TestCommWorker_Run5(t *testing.T) {
    proc := &testUtils.TestProcessor{Fail: true, Lines: []string{}} // simple test processor that fails
    createWriteFile(srs, fp1)
    wk := NewCommWorker(&AESCommOptions{CheckOrder: true, PTimeOut: pTimeout}, proc)
    err := wk.Run(fp1, fp2, testCategory, testSubCategory)
    assert.Nil(t, err)
    assert.Error(t, wk.Err()) // race condition
}

出力

==================
WARNING: DATA RACE
Read at 0x00c00048e3a8 by goroutine 15:
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).Err()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:144 +0x4b9
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.TestCommWorker_Run5()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker_test.go:103 +0x4ca
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202

Previous write at 0x00c00048e3a8 by goroutine 19:
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).interrupt()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:119 +0x22e
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).processStdout()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:107 +0x219

Goroutine 15 (running) created at:
  testing.(*T).Run()
      /usr/local/go/src/testing/testing.go:1168 +0x5bb
  testing.runTests.func1()
      /usr/local/go/src/testing/testing.go:1439 +0xa6
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202
  testing.runTests()
      /usr/local/go/src/testing/testing.go:1437 +0x612
  testing.(*M).Run()
      /usr/local/go/src/testing/testing.go:1345 +0x3b3
  main.main()
      _testmain.go:73 +0x236

Goroutine 19 (running) created at:
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:54 +0x169
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).Run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:94 +0x4f7
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.TestCommWorker_Run5()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker_test.go:101 +0x454
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202
==================
--- FAIL: TestCommWorker_Run5 (0.08s)
    testing.go:1038: race detected during execution of test
==================
WARNING: DATA RACE
Read at 0x00c0005160a0 by goroutine 19:
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).interrupt()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:120 +0x2ad
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).processStdout()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:107 +0x219

Previous write at 0x00c0005160a0 by goroutine 15:
  os/exec.(*Cmd).Start()
      /usr/local/go/src/os/exec/exec.go:422 +0x8e4
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.AESComm.Run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWrapper.go:93 +0x3c
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:56 +0x1fa
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).Run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:94 +0x4f7
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.TestCommWorker_Run5()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker_test.go:101 +0x454
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202

Goroutine 19 (running) created at:
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:54 +0x169
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).Run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:94 +0x4f7
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.TestCommWorker_Run5()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker_test.go:101 +0x454
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202

Goroutine 15 (running) created at:
  testing.(*T).Run()
      /usr/local/go/src/testing/testing.go:1168 +0x5bb
  testing.runTests.func1()
      /usr/local/go/src/testing/testing.go:1439 +0xa6
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202
  testing.runTests()
      /usr/local/go/src/testing/testing.go:1437 +0x612
  testing.(*M).Run()
      /usr/local/go/src/testing/testing.go:1345 +0x3b3
  main.main()
      _testmain.go:73 +0x236
==================
==================
WARNING: DATA RACE
Read at 0x00c0004b5890 by goroutine 19:
  os.(*Process).signal()
      /usr/local/go/src/os/exec_unix.go:65 +0x64
  os.(*Process).Signal()
      /usr/local/go/src/os/exec.go:131 +0x2f7
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).interrupt()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:120 +0x256
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).processStdout()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:107 +0x219

Previous write at 0x00c0004b5890 by goroutine 15:
  os.newProcess()
      /usr/local/go/src/os/exec.go:25 +0x5ee
  os.startProcess()
      /usr/local/go/src/os/exec_posix.go:62 +0x668
  os.StartProcess()
      /usr/local/go/src/os/exec.go:102 +0x92
  os/exec.(*Cmd).Start()
      /usr/local/go/src/os/exec/exec.go:422 +0x8af
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.AESComm.Run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWrapper.go:93 +0x3c
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:56 +0x1fa
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).Run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:94 +0x4f7
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.TestCommWorker_Run5()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker_test.go:101 +0x454
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202

Goroutine 19 (running) created at:
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:54 +0x169
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).Run()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:94 +0x4f7
  aesgit.devintermedia.net/aescore/sophosrs/commWrapper.TestCommWorker_Run5()
      /home/konstantin/go/src/SophosRS/commWrapper/commWorker_test.go:101 +0x454
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202

Goroutine 15 (running) created at:
  testing.(*T).Run()
      /usr/local/go/src/testing/testing.go:1168 +0x5bb
  testing.runTests.func1()
      /usr/local/go/src/testing/testing.go:1439 +0xa6
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202
  testing.runTests()
      /usr/local/go/src/testing/testing.go:1437 +0x612
  testing.(*M).Run()
      /usr/local/go/src/testing/testing.go:1345 +0x3b3
  main.main()
      _testmain.go:73 +0x236
==================
FAIL
exit status 1

4

1 に答える 1