バックグラウンド
プロデューサーはいくつかのデータを生成し、次のように順番に Kafka に送信します。
{uuid: 123 ステータス: 1}
{uuid: 123 ステータス: 3}
ステータス 1 は開始を意味します
ステータス 3 は成功を意味します
私はsarama.NewConsumerGroup(xx, xx, config).Consume(xx, xx, myhandler)を使用してコードを消費します。
func (h myhandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
key := fmt.Sprintf("%q-%d-%d", msg.Topic, msg.Partition, msg.Offset)
_, err := rdb.RedisClient.Get(h.ctx, key).Result()
if err == redis.Nil {
msgQueue <- msg.Value
sess.MarkMessage(msg, "")
rdb.RedisClient.Set(h.ctx, key, none, 12*time.Hour)
} else if err != nil {
log.Errorln("RedisClient get key error : ", err)
return err
} else {
continue
}
}
return nil
}
msgQueue := make(chan interface{}, 1000)
次に、msgQueue の値を構造体にデコードし、レコードを mysql に挿入します。
質問
通常、最終的なデータステータスは '3'ですが、時々'1' になることがあります。
また、チャネルmsgQueueのメッセージの順序が固定されていないことがわかりました。
では、データの最終ステータスが3であることを確認するにはどうすればよいでしょうか?
直し方
どのように最適化できるかを確認するには不十分な方法を提供しました。
conn := &gorm.DB{}
data := &Log{}
if data.Status != 1 {
conn = conn.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "uuid"}},
DoUpdates: clause.AssignmentColumns([]string{"status"}),
})
}
conn.Create(data)
return conn.Error
また、mysql にはuuidの一意の制約インデックスがあります。
データの順序が{uuid: 123 status: 1}、 {uuid: 123 status: 3}の場合、そうです。
データの順序が{uuid: 123 status: 3}、 {uuid: 123 status: 1}の場合、最終ステータスも正しいですが、エラーError 1062: Duplicate entry '123' for key 'unique_index_uuid'が返されます。
美しくない。では、どうすれば最適化できますか、またはそれを行う他の方法はありますか?