1

1000 万のレコードを mongodb に挿入する簡単な例を作成しました。順番に機能させることから始めました。次に、並行処理の方法を調べたところ、ゴルーチンが見つかりました。これは私が望んでいるように思えますが、期待どおりに動作していません。すべてのゴルーチンが終了する前にプログラムが終了するのをブロックする WaitGroup を実装しましたが、まだ問題があります。

そこで、何が起こっているのかから始めて、コードを示します。ゴルーチンなしでコードを実行すると、1,000 万件のレコードすべてが mongodb に正常に挿入されます。ただし、ゴルーチンを追加すると、不確定な量が入力されます..一般的には約8500で、数百です。mongodb ログをチェックして、問題が発生していないかどうかを確認しましたが、何も表示されていません。だから、ログに記録されていないだけなのかどうかはわかりません。とにかく、ここにコードがあります:

(補足: 一度に 1 つのレコードを処理していますが、将来的に複数のレコードを一度にテストできるようにメソッドに分割しました。mongodb でそれを行う方法がわかりません。まだ。)

package main

import (
  "fmt"
  "labix.org/v2/mgo"
  "strconv"
  "time"
  "sync"
)

// structs
type Reading struct {
  Id   string
  Name string
}

var waitGroup sync.WaitGroup

// methods
func main() {
  // Setup timer
  startTime := time.Now()

  // Setup collection
  collection := getCollection("test", "readings")
  fmt.Println("collection complete: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))

  // Setup readings
  readings := prepareReadings()
  fmt.Println("readings prepared: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))

  // Insert readings
  for i := 1; i <= 1000000; i++ {
    waitGroup.Add(1)
    go insertReadings(collection, readings)

    // fmt.Print(".")

    if i % 1000 == 0 {
      fmt.Println("1000 readings queued for insert: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
    }
  }
  waitGroup.Wait()

  fmt.Println("all readings inserted: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
}

func getCollection(databaseName string, tableName string) *mgo.Collection {
  session, err := mgo.Dial("localhost")

  if err != nil {
    // panic(err)
    fmt.Println("error getCollection:", err)
  }

  // defer session.Close()

  // Optional. Switch the session to a monotonic behavior.
  // session.SetMode(mgo.Monotonic, true)

  collection := session.DB(databaseName).C(tableName)

  return collection
}

func insertReadings(collection *mgo.Collection, readings []Reading) {
  err := collection.Insert(readings)

  if err != nil {
    // panic(err)
    fmt.Println("error insertReadings:", err)
  }

  waitGroup.Done()
}

func prepareReadings() []Reading {
  var readings []Reading
  for i := 1; i <= 1; i++ {
    readings = append(readings, Reading{Name: "Thing"})
  }

  return readings
}
4

2 に答える 2

5

プログラムの実行

完全なプログラムは、パッケージと呼ばれる単一のインポートされていないパッケージmainを、インポートするすべてのパッケージと推移的にリンクすることによって作成されます。mainパッケージにはパッケージ名が必要であり、引数を取らず値を返さないmain関数を宣言する必要があります。main

func main() { … }

プログラムの実行は、mainパッケージを初期化し、関数を呼び出すことによって開始されますmain。関数mainが戻ると、プログラムは終了します。main他の (非)goroutinesが完了するのを待ちません。

問題の単純で簡潔な、コンパイル可能で実行可能な例を提供してくれませんでした。これは、機能するコードの簡素化されたバージョンです。

package main

import (
    "fmt"
    "strconv"
    "sync"
    "time"
)

// structs
type Reading struct {
    Id   string
    Name string
}

var waitGroup sync.WaitGroup

func main() {
    // Setup timer
    startTime := time.Now()

    // Setup readings
    readings := prepareReadings()
    fmt.Println("readings prepared: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))

    // Insert readings
    for i := 1; i <= 1000000; i++ {
        waitGroup.Add(1)
        go insertReadings(readings)

        // fmt.Print(".")

        if i%100000 == 0 {
            fmt.Println("100000 readings queued for insert: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
        }
    }
    waitGroup.Wait()

    fmt.Println("all readings inserted: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
}

func insertReadings(readings []Reading) {
    waitGroup.Done()
}

func prepareReadings() []Reading {
    var readings []Reading
    for i := 1; i <= 1; i++ {
        readings = append(readings, Reading{Name: "Thing"})
    }
    return readings
}

出力:

readings prepared: 0.00
100000 readings queued for insert: 0.49
100000 readings queued for insert: 1.12
100000 readings queued for insert: 1.62
100000 readings queued for insert: 2.54
100000 readings queued for insert: 3.05
100000 readings queued for insert: 3.56
100000 readings queued for insert: 4.06
100000 readings queued for insert: 5.57
100000 readings queued for insert: 7.15
100000 readings queued for insert: 8.78
all readings inserted: 34.76

ここで、プログラムを 1 つずつ作成し、どこで失敗し始めるかを確認します。

于 2013-04-27T11:55:35.837 に答える