0

大量のテキスト ファイルから一意のリストを取得するタスクを Go で作成しました。チャネルを使用していくつかの並列化を行いましたが、現在、一貫性のない結果が得られています。同じ入力ファイルで毎回出力される/出力されない5つのレコードの差異です。

go run process.go | wc -lFedora x86_64、go1.1.2、8コアAMDでテストしています。

コードは次のとおりです。

package main

import (
    "fmt"
    "os"
    "io"    
    "encoding/csv"
    "regexp"
    "log"
)

var (
    cleanRe *regexp.Regexp = regexp.MustCompile("[^0-9]+")
    comma rune ='\t'
    fieldsPerRecord=-1
)

func clean(s string) string {
    clean:=cleanRe.ReplaceAllLiteralString(s,"")
    if len(clean)<6 {return ""}
    return clean
}

func uniqueChannel(inputChan chan []string, controlChan chan string) {
    defer func(){controlChan<-"Input digester."}()
    uniq:=make(map[string]map[string]bool)
    i:=0
    for record:= range inputChan {
        i++
        id,v:=record[0],record[1]
        if uniq[id]==nil {
            uniq[id]=make(map[string]bool)
        } else if !uniq[id][v] {
            uniq[id][v]=true
            fmt.Println(id,string(comma),v)
        }
    }
    log.Println("digest ", i)
}

func processFile(fileName string, outputChan chan []string, controlChan chan string) {
    defer func(){controlChan<-fileName}()
    f,err:=os.Open(fileName)
    if err!=nil{log.Fatal(err)}
    r:=csv.NewReader(f)
    r.FieldsPerRecord = fieldsPerRecord
    r.Comma = comma

    //  Process the records
    i:=0
    for record,err:=r.Read();err!=io.EOF;record,err=r.Read() {
        if err!=nil{continue}
        id:=record[0]
        for _,v:=range record[1:] {
            if cleanV:=clean(v);cleanV!=""{
                i++
                outputChan<-[]string{id,cleanV}
            }
        }
    }
    log.Println(fileName,i)
}


func main() {
    inputs:=[]string{}
    recordChan:=make(chan []string,100)
    processesLeft:=len(inputs)+1
    controlChan:=make(chan string,processesLeft)

    //  Ingest the inputs
    for _,fName:=range inputs {
        go processFile(fName,recordChan,controlChan)
    }

    //  This is the loop to ensure it's all unique
    go uniqueChannel(recordChan,controlChan)

    //  Make sure all the channels close up
    for processesLeft>0 {
        if processesLeft==1{
            close(recordChan)
        }
        c:=<-controlChan
        log.Println(c)
        processesLeft--
    }
    close(controlChan)
}

空っぽになる前にチャネルを閉じるようです。閉鎖メカニズムがなければ、デッドロックが発生していました-アイデアがありません。

4

1 に答える 1