-1

キネシス データ ストリームに AWS の golang SDK を使用する方法を学び始めたところです。Suzuken の投稿に従って、プロデューサーが cloudwatch logs である aws のデータ ストリームからレコードを取得するスクリプトを作成しました。

func suzukenGetRecords() {
    cfg, err := config.LoadDefaultConfig(context.TODO())
    if err != nil {
        log.Fatal(err)
    }

    // Create an Amazon S3 service client
    kc := kinesis.NewFromConfig(cfg)

    // Get the stream
    streams, err := kc.DescribeStream(context.TODO(), &kinesis.DescribeStreamInput{
        StreamName: aws.String("aws-go-starter"),
    })
    if err != nil {
        log.Fatal(err)
    }

    putOutput, err := kc.PutRecord(context.TODO(), &kinesis.PutRecordInput{
        Data:         []byte("hoge"),
        StreamName:   aws.String("aws-go-starter"),
        PartitionKey: aws.String("key1"),
    })
    if err != nil {
        panic(err)
    }
    fmt.Printf("%v\n", putOutput)

    // retrieve iterator
    iteratorOutput, err := kc.GetShardIterator(context.TODO(), &kinesis.GetShardIteratorInput{
        ShardId:                aws.String(*streams.StreamDescription.Shards[0].ShardId),
        ShardIteratorType:      "TRIM_HORIZON",
        StreamName:             aws.String("aws-go-starter"),
        StartingSequenceNumber: new(string),
        Timestamp:              &time.Time{},
    })
    if err != nil {
        log.Panic(err)
    }

    records, err := kc.GetRecords(context.TODO(), &kinesis.GetRecordsInput{
        ShardIterator: iteratorOutput.ShardIterator,
    })
    if err != nil {
        panic(err)
    }
    fmt.Printf("%v\n", records)

}

「test1」、「test2」、「testtest」など、手動で Cloudwatch ログに入力したものを取得できると思っていましたが、代わりに一連の数字を取得しました。

2022/02/07 18:48:25 GetRecords Data: [31 139 8 0 0 0 0 0 0 0 93 143 77 107 2 49 16 134 255 75 206 10 153 124 76 38 222 22 186 245 212 211 122 43 82 162 134 37 224 110 150 36 86 138 248 223 59 149 122 241 250 60 243 190 51 115 19 83 172 53 140 113 247 179 68 177 17 111 221 174 251 250 232 135 161 219 246 98 37 242 117 142 133 49 121 141 202 42 139 142 44 227 115 30 183 37 95 22 54 225 90 215 99 94 215 22 74 227 209 135 27 90 137 97 98 249 79 129 113 189 28 234 177 164 165 165 60 191 167 51 211 42 54 159 175 241 253 35 223 127 199 185 253 233 155 72 39 174 209 136 72 202 104 231 201 32 73 13 206 146 113 64 30 189 51 142 164 67 235 1 180 84 164 128 180 145 222 27 75 210 16 175 109 137 255 107 97 226 83 1 141 81 40 61 120 112 110 245 252 155 235 79 161 138 251 254 254 11 46 196 173 166 11 1 0 0]
2022/02/07 18:48:25 GetRecords Data: [31 139 8 0 0 0 0 0 0 0 93 143 61 111 131 48 16 134 255 203 205 137 100 31 248 176 217 144 66 51 117 34 91 137 42 39 181 144 165 128 145 237 52 170 16 255 189 71 218 46 29 238 67 207 123 159 11 140 46 37 59 184 211 215 236 160 134 67 115 106 222 95 219 174 107 142 45 236 32 60 38 23 25 107 83 16 42 84 84 105 197 248 22 134 99 12 247 153 21 251 72 251 33 236 83 182 49 115 233 83 235 114 116 118 100 241 151 74 198 233 126 73 215 232 231 236 195 244 226 111 76 19 212 111 255 219 207 207 254 246 211 77 121 147 23 240 31 60 166 32 34 141 37 161 16 5 153 202 20 12 80 34 59 69 12 140 212 108 146 176 82 164 133 48 85 169 81 242 161 196 107 179 231 255 178 29 249 84 73 101 137 36 5 146 214 184 251 251 155 199 47 125 223 131 189 92 183 80 255 228 155 95 97 61 175 223 196 138 122 129 32 1 0 0]

別の投稿から、データの前に []byte を使用できることがわかりましたが、それでも機能しませんでした。

誰かが私を助けることができますか?

- - アップデート - -

おー!不思議ではありません!私が従ったチュートリアルには、実際にはバイトスライスを非整列化する行がありましたが、次のコードを使用すると:

for _, d := range records.Records {
    m := make(map[string]interface{})
    err := json.Unmarshal([]byte(d.Data), &m)
    if err != nil {
        log.Println(err)
        continue
    }
    log.Printf("GetRecords Data: %v\n", m)
    // log.Printf("GetRecords Data: %v\n", []byte(d.Data))
}

次のメッセージが返されましたが、これはエラーではありません。

2022/02/07 20:17:24 invalid character '\x1f' looking for beginning of value

この無効な問題を解決するにはどうすればよいですか?

- - アップデート - -

aws cli が機能していることを確認し、次のように base64 でデコードした後に出力を確認できました。

echo -n "H4sIAAAAAAAAADWPPU8DMQyG/0vmDrETO063kzg6MV03VKGjRFVQ70NJCkJV/zuGwujHfmy/VzOlWsdT2n+tyWzNQ7fvXp76Yeh2vdmY5XNORbFEx0hIHIQUn5fTriyXVTvv4zHdydBKGqc/BMrq5bUeS15bXubHfG6pVLN9vhuHX6X/SHP7gVeT39R0zCwRKDITIlmnhY8iICTOktgQfLCCEAEV+8BkGQD1WMsapI2T/gTsPUawbNHR5j+grm86QeZ2uH0DFU6ZLPYAAAA=" | base64 -d | zcat

ただし、go ソース コードを実行すると、同じ2022/02/07 23:00:03 無効な文字 '\x1f' 値の開始を探しているというメッセージが引き続き表示されます。

これが私が実行した完全なコードです:

package main

import (
    "context"
    "encoding/json"
    "log"
    "time"

    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/kinesis"
)

func main() {
    kinesisGetRecords()
}

func kinesisGetRecords() {
    // Load the Shared AWS Configuration (~/.aws/config)
    cfg, err := config.LoadDefaultConfig(context.TODO())
    if err != nil {
        log.Fatal(err)
    }

    // Create an Kinesis service client
    kc := kinesis.NewFromConfig(cfg)

    // Define stream name
    streamName := aws.String("jace")

    // Get the stream
    streams, err := kc.DescribeStream(context.TODO(), &kinesis.DescribeStreamInput{
        StreamName: streamName,
    })
    if err != nil {
        log.Fatal(err)
    }

    // Retrieve iterator
    iteratorOutput, err := kc.GetShardIterator(context.TODO(), &kinesis.GetShardIteratorInput{
        ShardId:           aws.String(*streams.StreamDescription.Shards[0].ShardId),
        ShardIteratorType: "TRIM_HORIZON",
        StreamName:        streamName,
    })
    if err != nil {
        log.Panic(err)
    }

    // Display records
    shardIterator := iteratorOutput.ShardIterator
    var a *string

    // get data using infinity looping
    // we will attempt to consume data every 1 secons, if no data, nothing will be happen
    for {
        // get records use shard iterator for making request
        records, err := kc.GetRecords(context.TODO(), &kinesis.GetRecordsInput{
            ShardIterator: shardIterator,
        })

        // if error, wait until 1 seconds and continue the looping process
        if err != nil {
            time.Sleep(1000 * time.Millisecond)
            continue
        }

        // process the data
        if len(records.Records) > 0 {
            for _, d := range records.Records {
                m := make(map[string]interface{})
                err := json.Unmarshal([]byte(d.Data), &m)
                if err != nil {
                    log.Println(err)
                    continue
                }
                log.Printf("GetRecords Data: %v\n", m)
            }
        } else if records.NextShardIterator == a || shardIterator == records.NextShardIterator || err != nil {
            log.Printf("GetRecords ERROR: %v\n", err)
            break
        }
        shardIterator = records.NextShardIterator
        time.Sleep(1000 * time.Millisecond)
    }
}

誰か助けてくれませんか?

4

1 に答える 1