3

MongoDB のレプリケーション oplog を読み取り、結果を Go 構造にシリアル化し、それをチャネルに送信して処理するために取り組んでいる単純なアプリケーションがあります。現在、そのチャネルから読み取り、構造内の値を単純に出力しています。

for/rangeを使用してチャネルから値を読み取り、そこから直接単純に読み取り、タイムアウトのある選択の中に入れてみました。結果はすべて同じです。コードを実行するたびに、チャネルから異なる結果が得られます。チャネルが書き込まれるたびに1回だけ表示されますが、そのチャネルから読み取ると、1回の書き込みでも同じ値を1〜3回、場合によっては4回読み取ることがあります。

これは通常、初期ロード (古いレコードのプル) でのみ発生し、チャネルへのライブ追加を読み取るときには発生しないようです。最初の読み取り時にアイテムが削除される前に、チャネルからの読み取りが速すぎるという問題はありますか?

package main

import (
    "fmt"
    "labix.org/v2/mgo"
    "labix.org/v2/mgo/bson"
)

type Operation struct {
    Id        int64  `bson:"h" json:"id"`
    Operator  string `bson:"op" json:"operator"`
    Namespace string `bson:"ns" json:"namespace"`
    Select    bson.M `bson:"o" json:"select"`
    Update    bson.M `bson:"o2" json:"update"`
    Timestamp int64  `bson:"ts" json:"timestamp"`
}

func Tail(collection *mgo.Collection, Out chan<- *Operation) {
    iter := collection.Find(nil).Tail(-1)
    var oper *Operation

    for {
        for iter.Next(&oper) {
            fmt.Println("\n<<", oper.Id)
            Out <- oper
        }

        if err := iter.Close(); err != nil {
            fmt.Println(err)
            return
        }
    }
}

func main() {
    session, err := mgo.Dial("127.0.0.1")

    if err != nil {
        panic(err)
    }
    defer session.Close()

    c := session.DB("local").C("oplog.rs")

    cOper := make(chan *Operation, 1)

    go Tail(c, cOper)

    for operation := range cOper {
        fmt.Println()
        fmt.Println("Id: ", operation.Id)
        fmt.Println("Operator: ", operation.Operator)
        fmt.Println("Namespace: ", operation.Namespace)
        fmt.Println("Select: ", operation.Select)
        fmt.Println("Update: ", operation.Update)
        fmt.Println("Timestamp: ", operation.Timestamp)
    }
}
4

2 に答える 2