0

cockroachdb (postgres db) へのラムダ通知を含む minio/s3 オブジェクト ストアがあります。以下のgolangコードでこれらのイベントを監視しようとしています。

package main

import (
    "database/sql"
    "encoding/json"
    "fmt"
    "github.com/lib/pq"
    "time"
)

const (
    //crdbConnectStr = "dbname=alerts user=crdbuser1 host=localhost port=26257 sslmode=disable connect_timeout=5"
    crdbConnectStr = "postgres://crdbuser1@localhost:26257/alerts?sslmode=disable"
    dbDriver       = "postgres"
)

func monitorEvents() {

    _, err := sql.Open(dbDriver, crdbConnectStr)
    if err != nil {
        fmt.Printf("connection open to crdb failed - %v\n", err.Error())
    }

    fmt.Printf("sql open on crdb OK\n")

    reportProblem := func(ev pq.ListenerEventType, err error) {
        if err != nil {
            fmt.Printf("NewListener - event : %v, err - %v\n", ev, err.Error())
        }
    }

    minReconnect := 2 * time.Second
    maxReconnect := 20 * time.Second
    listener := pq.NewListener(crdbConnectStr, minReconnect, maxReconnect, reportProblem)
    err = listener.Listen("monitor")
    if err != nil {
        fmt.Printf("Listen error - %v\n", err.Error())
        return
    }

    fmt.Printf("begin monitoring events in CRDB\n")

    for {
        waitForAlertEvents(listener)
    }
}

// Record holds json data from object.
type Record struct {
    Data struct {
        Value struct {
            Records []struct {
                S3 struct {
                    Bucket struct {
                        Name string `json:"name"`
                    } `json:"bucket"`
                    Object struct {
                        Key string `json:"key"`
                    } `json:"object"`
                } `json:"s3"`
            } `json:"Records"`
        } `json:"value"`
    } `json:"data"`
}

func waitForAlertEvents(l *pq.Listener) {

    for {
        select {
        case n := <-l.Notify:
            fmt.Printf("Received data from channel [%v]\n", n.Channel)
            // Prepare notification payload for pretty print
            fmt.Println(n.Extra)
            record := Record{}

            jerr := json.Unmarshal([]byte(n.Extra), &record)
            if jerr != nil {
                fmt.Println("Error processing JSON: ", jerr)
                return
            }

            bucket := record.Data.Value.Records[0].S3.Bucket.Name
            object := record.Data.Value.Records[0].S3.Object.Key
            fmt.Printf("received event on bucket: %v, object: %v\n", bucket, object)

            return

        case <-time.After(60 * time.Second):
            fmt.Println("Received no events for 90 seconds, checking connection")
            go func() {
                l.Ping()
            }()
            return
        }
    }
}

func main() {
    monitorAlerts()
}

このプログラムを実行すると、以下のエラーが表示されてスタックします。

[root]# ./alerts 
sql open on crdb OK
NewListener - event : 3, err - pq: syntax error at or near "listen"
NewListener - event : 3, err - pq: syntax error at or near "listen"
NewListener - event : 3, err - pq: syntax error at or near "listen"

cockroachdb への接続は手動で正常に動作します。

[root]# cockroach sql --insecure --user=crdbuser1

crdbuser1@:26257/defaultdb> show databases;                                                                                                               database_name  
+---------------+
  alerts         
(1 row)

Time: 1.22359ms

crdbuser1@:26257/defaultdb> set database=alerts;
SET

Time: 363.994µs

crdbuser1@:26257/alerts> show tables;
  table_name  
+------------+
  alertstab   
(1 row)

Time: 1.399014ms

crdbuser1@:26257/alerts> 

エラーの理由についての考えpq: syntax error at or near "listen"。また、私はpqソースを見ていましたが、エラーはnotify.go#L756に関連している可能性が最も高いです

4

1 に答える 1