だから私はいくつかのパイプラインを持っています。2 つのファイルを行ごとに比較し、データベースへのロードなどを行う必要があります。単一行を処理するためのタイムアウトを設定し、到達した場合はパイプラインを中断したいと考えています。エンドポイントに入るのは functionRun()
です。ここでは、いくつかの検証を行い、コマンドで構造を作成します
その後、fuctionrun()
で、単一の goroutine で stdout の処理を開始comm
し、ubuntu でメイン プロセス コマンドを実行します。まあ、それは正常に動作し、私はそれをレースにチェックしました:
go build --race
, go run --race
. 出力に問題はありません。しかし、単純なロジックをチェックするテストがあります。ほぼ問題なく動作しますが、10 回実行する前に 1 回失敗します。これを調査した後、このテストで競合状態が見つかりました。誰かが説明できますか、何がうまくいかないのですか?
入る
func (cmWk *CommWorker) Run(path1, path2, category, subcategory string) error {
err := cmWk.opt.validate()
if err != nil {
return err
}
if notExistFile(path1) {
return &AESCommError{fmt.Sprintf(
"Check your file path [%s].It is invalid", path1),
}
}
if isDir(path1) {
return &AESCommError{fmt.Sprintf(
"Check your file path [%s].It is invalid", path1),
}
}
// If path2 does not exist, we create this file
if notExistFile(path2) {
err := createIfNotExist(path2)
if err != nil {
return err
}
}
cmWk.category = category
cmWk.subcategory = subcategory
cmWk.aesComm = NewAESComm(path1, path2, cmWk.opt)
return cmWk.run()
}
// NewAESComm returns AESComm structure with described options
func NewAESComm(path1, path2 string, options *AESCommOptions) *AESComm {
arguments := append(getArguments(options), path1, path2)
return &AESComm{
FilePath1: path1,
FilePath2: path2,
cmd: exec.Command("comm", arguments...),
}
}
// Run starts comm process with options
func (cmWk *CommWorker) run() error {
stdout, err := cmWk.aesComm.StdoutPipe()
if err != nil {
return err
}
defer stdout.Close()
go cmWk.processStdout(stdout)
return cmWk.aesComm.Run()
}
// Run start comm task
func (comm AESComm) Run() error {
if err := comm.cmd.Start(); err != nil {
return err
}
return comm.cmd.Wait()
}
func (cmWk *CommWorker) processStdout(stdout io.ReadCloser) {
scanner := bufio.NewScanner(stdout)
timer := time.NewTimer(cmWk.pTimeout)
erCh := make(chan error, 1)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "#") {
continue
}
if err := cmWk.process(line, timer, erCh); err != nil {
cmWk.interrupt(err)
return
}
timer.Reset(cmWk.pTimeout)
}
if err := cmWk.clear(erCh); err != nil {
cmWk.interrupt(err)
}
}
func (cmWk *CommWorker) process(line string, timer *time.Timer, erCh chan error) error {
erCh <- cmWk.processor.ProcessLine(line, cmWk.category, cmWk.subcategory)
select {
case <-timer.C:
return fmt.Errorf("got timeout error handling line")
default:
return <-erCh
}
}
func (cmWk *CommWorker) interrupt(err error) {
cmWk.Error = err
_ = cmWk.aesComm.cmd.Process.Signal(signal)
}
壊れたテスト
func TestCommWorker_Run5(t *testing.T) {
proc := &testUtils.TestProcessor{Fail: true, Lines: []string{}} // simple test processor that fails
createWriteFile(srs, fp1)
wk := NewCommWorker(&AESCommOptions{CheckOrder: true, PTimeOut: pTimeout}, proc)
err := wk.Run(fp1, fp2, testCategory, testSubCategory)
assert.Nil(t, err)
assert.Error(t, wk.Err()) // race condition
}
出力
==================
WARNING: DATA RACE
Read at 0x00c00048e3a8 by goroutine 15:
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).Err()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:144 +0x4b9
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.TestCommWorker_Run5()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker_test.go:103 +0x4ca
testing.tRunner()
/usr/local/go/src/testing/testing.go:1123 +0x202
Previous write at 0x00c00048e3a8 by goroutine 19:
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).interrupt()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:119 +0x22e
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).processStdout()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:107 +0x219
Goroutine 15 (running) created at:
testing.(*T).Run()
/usr/local/go/src/testing/testing.go:1168 +0x5bb
testing.runTests.func1()
/usr/local/go/src/testing/testing.go:1439 +0xa6
testing.tRunner()
/usr/local/go/src/testing/testing.go:1123 +0x202
testing.runTests()
/usr/local/go/src/testing/testing.go:1437 +0x612
testing.(*M).Run()
/usr/local/go/src/testing/testing.go:1345 +0x3b3
main.main()
_testmain.go:73 +0x236
Goroutine 19 (running) created at:
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).run()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:54 +0x169
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).Run()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:94 +0x4f7
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.TestCommWorker_Run5()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker_test.go:101 +0x454
testing.tRunner()
/usr/local/go/src/testing/testing.go:1123 +0x202
==================
--- FAIL: TestCommWorker_Run5 (0.08s)
testing.go:1038: race detected during execution of test
==================
WARNING: DATA RACE
Read at 0x00c0005160a0 by goroutine 19:
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).interrupt()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:120 +0x2ad
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).processStdout()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:107 +0x219
Previous write at 0x00c0005160a0 by goroutine 15:
os/exec.(*Cmd).Start()
/usr/local/go/src/os/exec/exec.go:422 +0x8e4
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.AESComm.Run()
/home/konstantin/go/src/SophosRS/commWrapper/commWrapper.go:93 +0x3c
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).run()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:56 +0x1fa
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).Run()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:94 +0x4f7
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.TestCommWorker_Run5()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker_test.go:101 +0x454
testing.tRunner()
/usr/local/go/src/testing/testing.go:1123 +0x202
Goroutine 19 (running) created at:
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).run()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:54 +0x169
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).Run()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:94 +0x4f7
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.TestCommWorker_Run5()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker_test.go:101 +0x454
testing.tRunner()
/usr/local/go/src/testing/testing.go:1123 +0x202
Goroutine 15 (running) created at:
testing.(*T).Run()
/usr/local/go/src/testing/testing.go:1168 +0x5bb
testing.runTests.func1()
/usr/local/go/src/testing/testing.go:1439 +0xa6
testing.tRunner()
/usr/local/go/src/testing/testing.go:1123 +0x202
testing.runTests()
/usr/local/go/src/testing/testing.go:1437 +0x612
testing.(*M).Run()
/usr/local/go/src/testing/testing.go:1345 +0x3b3
main.main()
_testmain.go:73 +0x236
==================
==================
WARNING: DATA RACE
Read at 0x00c0004b5890 by goroutine 19:
os.(*Process).signal()
/usr/local/go/src/os/exec_unix.go:65 +0x64
os.(*Process).Signal()
/usr/local/go/src/os/exec.go:131 +0x2f7
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).interrupt()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:120 +0x256
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).processStdout()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:107 +0x219
Previous write at 0x00c0004b5890 by goroutine 15:
os.newProcess()
/usr/local/go/src/os/exec.go:25 +0x5ee
os.startProcess()
/usr/local/go/src/os/exec_posix.go:62 +0x668
os.StartProcess()
/usr/local/go/src/os/exec.go:102 +0x92
os/exec.(*Cmd).Start()
/usr/local/go/src/os/exec/exec.go:422 +0x8af
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.AESComm.Run()
/home/konstantin/go/src/SophosRS/commWrapper/commWrapper.go:93 +0x3c
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).run()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:56 +0x1fa
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).Run()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:94 +0x4f7
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.TestCommWorker_Run5()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker_test.go:101 +0x454
testing.tRunner()
/usr/local/go/src/testing/testing.go:1123 +0x202
Goroutine 19 (running) created at:
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).run()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:54 +0x169
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.(*CommWorker).Run()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker.go:94 +0x4f7
aesgit.devintermedia.net/aescore/sophosrs/commWrapper.TestCommWorker_Run5()
/home/konstantin/go/src/SophosRS/commWrapper/commWorker_test.go:101 +0x454
testing.tRunner()
/usr/local/go/src/testing/testing.go:1123 +0x202
Goroutine 15 (running) created at:
testing.(*T).Run()
/usr/local/go/src/testing/testing.go:1168 +0x5bb
testing.runTests.func1()
/usr/local/go/src/testing/testing.go:1439 +0xa6
testing.tRunner()
/usr/local/go/src/testing/testing.go:1123 +0x202
testing.runTests()
/usr/local/go/src/testing/testing.go:1437 +0x612
testing.(*M).Run()
/usr/local/go/src/testing/testing.go:1345 +0x3b3
main.main()
_testmain.go:73 +0x236
==================
FAIL
exit status 1