メッセージをAmazon MSKにプッシュするためにsarama golang ライブラリを使用していました。これまで、私は msk バージョン 2.2.1 を使用していましたが、コードは正常に動作していましたが、現在は msk バージョンが 2.3.1 に変更されています。現在、メッセージをトピックにプッシュできません。
エラー:
パーティション -1
オフセット -1
要求は、このブローカーに存在しないトピックまたはパーティションに対するものでした。
コード:
func getKafkaEventClient() (sarama.Client, error) {
if !setupDone {
return nil, errors.New("Invalid setup")
}
if kafkaEventClient != nil {
return kafkaEventClient, nil
}
err := initKafkaEventClient()
if err != nil {
return nil, err
}
return kafkaEventClient, nil
}
func initKafkaEventClient() (err error) {
config := sarama.NewConfig()
config.Net.TLS.Enable = false
config.Producer.Return.Successes = true
config.Version = sarama.V0_10_0_0
brokers := strings.Split(kafkaEventHost, ",") //split the host into brokers
kafkaEventClient, err = sarama.NewClient(brokers, config)
if err != nil {
log.Println("initKafkaClient: failed to create new kafka client", err)
return
}
}
func PushMessageToKafka(message string) {
client, err := getKafkaEventClient()
if err != nil {
return
}
producer, err := sarama.NewSyncProducerFromClient(kafkaEventClient)
if err != nil {
fmt.Println("PushMessageToKafka: failed to get producer", err)
return
}
var msg sarama.ProducerMessage
msg.Topic = "some_topic"
msg.Value = sarama.StringEncoder("some_message")
p, o, err := producer.SendMessage(&msg)
fmt.Println("Partition", p)
fmt.Println("Offset", o)
if err != nil {
fmt.Println("PushMessageToKafka: failed to push message to be displayed", err)
}
}
sarama のバージョンも maxVersion に変更しましたconfig.Version = sarama.MaxVersion
が、Amazon MSK 2.3.1 では動作しません。
いくつかの解決策を提供してください。