0

バックグラウンド

プロデューサーはいくつかのデータを生成し、次のように順番に Kafka に送信します。

{uuid: 123 ステータス: 1}

{uuid: 123 ステータス: 3}

ステータス 1 は開始を意味します

ステータス 3 は成功を意味します

私はsarama.NewConsumerGroup(xx, xx, config).Consume(xx, xx, myhandler)を使用してコードを消費します。

func (h myhandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {

        key := fmt.Sprintf("%q-%d-%d", msg.Topic, msg.Partition, msg.Offset)
        _, err := rdb.RedisClient.Get(h.ctx, key).Result()
        if err == redis.Nil {
            msgQueue <- msg.Value
            sess.MarkMessage(msg, "")
            rdb.RedisClient.Set(h.ctx, key, none, 12*time.Hour)
        } else if err != nil {
            log.Errorln("RedisClient get key error : ", err)
            return err
        } else {
            continue
        }

    }
    return nil
}

msgQueue := make(chan interface{}, 1000)

次に、msgQueue の値を構造体にデコードし、レコードを mysql に挿入します。

質問

通常、最終的なデータステータスは '3'ですが、時々'1' になることがあります。

また、チャネルmsgQueueのメッセージの順序が固定されていないことがわかりました。

では、データの最終ステータスが3であることを確認するにはどうすればよいでしょうか?

直し方

どのように最適化できるかを確認するには不十分な方法を提供しました。

    conn := &gorm.DB{}
    data := &Log{}
    if data.Status != 1 {
        conn = conn.Clauses(clause.OnConflict{
            Columns:   []clause.Column{{Name: "uuid"}},
            DoUpdates: clause.AssignmentColumns([]string{"status"}),
        })
    }
    conn.Create(data)
    return conn.Error

また、mysql にはuuidの一意の制約インデックスがあります。

データの順序が{uuid: 123 status: 1}{uuid: 123 status: 3}の場合、そうです。

データの順序が{uuid: 123 status: 3}{uuid: 123 status: 1}の場合、最終ステータスも正しいですが、エラーError 1062: Duplicate entry '123' for key 'unique_index_uuid'が返されます。

美しくない。では、どうすれば最適化できますか、またはそれを行う他の方法はありますか?

4

1 に答える 1

1

これは、トピック パーティションによって異なります。Kafka はトピック内での順序保証を提供せず、パーティション内でのみ提供します。

つまり、メッセージ A を送信してからメッセージ Bをパーティション 0に送信した場合、最初に A、次に B という順序になります。 A が書き込まれる前。

以下は、Confluent の Web サイトからの引用です。

Kafka は、トピック内の異なるパーティション間ではなく、パーティション内のレコードの合計順序のみを提供します。ほとんどのアプリケーションでは、データをキーで分割する機能と組み合わせたパーティションごとの順序付けで十分です。ただし、レコード全体の順序が必要な場合は、パーティションが 1 つだけのトピックでこれを実現できますが、これはコンシューマー グループごとに 1 つのコンシューマー プロセスのみを意味します。

リンク

于 2021-10-25T11:53:57.200 に答える