使用事例:
- プロデューサー: トピックで Kafka に書き込みます
db.inventory.customers
- ConsumerGroup1 (cg1): 読み取り
db.inventory.customers
と書き込みloader-b.inventory.customers
- ConsumerGroup2 (cg2):
loader-b.inventory.customers
Github から読み書きします。
ラグを監視し、いくつかの作業を行う
cg1ラグとcg2ラグを監視します。ラグが0 <= lag <= 100
両方のコンシューマ グループの範囲内にある場合、何らかのタスクを実行します。
問題
問題は、スループットが低いローダー トピックで、cg2 が消えるため、そのラグがわからず、-1 と見なします。私たちの条件が満たされることはなく、立ち往生しています。
0 <= lag <= 100
ここで、cg1 と cg2-1 <= lag <= 100
の条件を考えると、
次に、cg2 が作成されていない最初の実行でも、条件が満たされたと見なされます。しかし、私たちはそれを望んでいません。いくつかの作業を行ってから、ラグが発生する必要があります。
私は何をすべきか?
コード
func (t *kafkaWatch) consumerGroupLag(
id string,
topic string,
partition int32,
broker *sarama.Broker,
) (
int64,
error,
) {
defaultLag := int64(-1)
lastOffset, err := t.client.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
return defaultLag, fmt.Errorf("Error getting offset for topic partition: %s, err: %v", topic, err)
}
offsetFetchRequest := sarama.OffsetFetchRequest{
ConsumerGroup: id,
Version: 1,
}
offsetFetchRequest.AddPartition(topic, partition)
err = broker.Open(t.client.Config())
if err != nil && err != sarama.ErrAlreadyConnected {
return defaultLag, fmt.Errorf("Error opening broker connection again, err: %v", err)
}
offsetFetchResponse, err := broker.FetchOffset(&offsetFetchRequest)
if err != nil {
return defaultLag, fmt.Errorf(
"Error fetching offset for offsetFetchRequest: %s %v, err: %v",
topic, offsetFetchRequest, err)
}
if offsetFetchResponse == nil {
return defaultLag, fmt.Errorf(
"OffsetFetch request got no response for request: %+v",
offsetFetchRequest)
}
for topicInResponse, partitions := range offsetFetchResponse.Blocks {
if topicInResponse != topic {
continue
}
for partitionInResponse, offsetFetchResponseBlock := range partitions {
if partition != partitionInResponse {
continue
}
// Kafka will return -1 if there is no offset associated
// with a topic-partition under that consumer group
if offsetFetchResponseBlock.Offset == -1 {
klog.V(4).Infof("%s not consumed by group: %v", topic, id)
return defaultLag, nil
}
if offsetFetchResponseBlock.Err != sarama.ErrNoError {
return defaultLag, fmt.Errorf(
"Error since offsetFetchResponseBlock.Err != sarama.ErrNoError for offsetFetchResponseBlock.Err: %+v",
offsetFetchResponseBlock.Err)
}
return lastOffset - offsetFetchResponseBlock.Offset, nil
}
}
klog.Warningf("%s for group is not active or present in Kafka", topic)
return defaultLag, nil
}