1

使用事例:

  • プロデューサー: トピックで Kafka に書き込みますdb.inventory.customers
  • ConsumerGroup1 (cg1): 読み取りdb.inventory.customersと書き込みloader-b.inventory.customers
  • ConsumerGroup2 (cg2): loader-b.inventory.customersGithub から読み書きします。

ラグを監視し、いくつかの作業を行う

cg1ラグとcg2ラグを監視します。ラグが0 <= lag <= 100両方のコンシューマ グループの範囲内にある場合、何らかのタスクを実行します。

問題

問題は、スループットが低いローダー トピックで、cg2 が消えるため、そのラグがわからず、-1 と見なします。私たちの条件が満たされることはなく、立ち往生しています。

0 <= lag <= 100ここで、cg1 と cg2-1 <= lag <= 100の条件を考えると、

次に、cg2 が作成されていない最初の実行でも、条件が満たされたと見なされます。しかし、私たちはそれを望んでいません。いくつかの作業を行ってから、ラグが発生する必要があります。

私は何をすべきか?

コード

func (t *kafkaWatch) consumerGroupLag(
    id string,
    topic string,
    partition int32,
    broker *sarama.Broker,
) (
    int64,
    error,
) {
    defaultLag := int64(-1)

    lastOffset, err := t.client.GetOffset(topic, partition, sarama.OffsetNewest)
    if err != nil {
        return defaultLag, fmt.Errorf("Error getting offset for topic partition: %s, err: %v", topic, err)
    }

    offsetFetchRequest := sarama.OffsetFetchRequest{
        ConsumerGroup: id,
        Version:       1,
    }
    offsetFetchRequest.AddPartition(topic, partition)

    err = broker.Open(t.client.Config())
    if err != nil && err != sarama.ErrAlreadyConnected {
        return defaultLag, fmt.Errorf("Error opening broker connection again, err: %v", err)
    }

    offsetFetchResponse, err := broker.FetchOffset(&offsetFetchRequest)
    if err != nil {
        return defaultLag, fmt.Errorf(
            "Error fetching offset for offsetFetchRequest: %s %v, err: %v",
            topic, offsetFetchRequest, err)
    }
    if offsetFetchResponse == nil {
        return defaultLag, fmt.Errorf(
            "OffsetFetch request got no response for request: %+v",
            offsetFetchRequest)
    }

    for topicInResponse, partitions := range offsetFetchResponse.Blocks {
        if topicInResponse != topic {
            continue
        }

        for partitionInResponse, offsetFetchResponseBlock := range partitions {
            if partition != partitionInResponse {
                continue
            }
            // Kafka will return -1 if there is no offset associated
            // with a topic-partition under that consumer group
            if offsetFetchResponseBlock.Offset == -1 {
                klog.V(4).Infof("%s not consumed by group: %v", topic, id)
                return defaultLag, nil
            }
            if offsetFetchResponseBlock.Err != sarama.ErrNoError {
                return defaultLag, fmt.Errorf(
                    "Error since offsetFetchResponseBlock.Err != sarama.ErrNoError for offsetFetchResponseBlock.Err: %+v",
                    offsetFetchResponseBlock.Err)
            }
            return lastOffset - offsetFetchResponseBlock.Offset, nil
        }
    }

    klog.Warningf("%s for group is not active or present in Kafka", topic)
    return defaultLag, nil
}
4

0 に答える 0