6

更新: Docker のポートに問題があることが判明しました。なぜこの現象が修正されたのかわかりません。

奇妙なエラーに遭遇したと思います。私はSaramaライブラリを使用しており、コンシューマーを正常に作成できます。

func main() {
 config = sarama.NewConfig()
 config.ClientID = "go-kafka-consumer"
 config.Consumer.Return.Errors = true
 // Create new consumer
 master, err := sarama.NewConsumer("localhost:9092", config)
 if err != nil {
    panic(err)
 }

 defer func() {
     if err := master.Close(); err != nil {
         panic(err)
     }
 }()

 partitionConsumer, err := master.ConsumePartition("myTopic",0, 
 sarama.OffsetOldest)
 if err != nil {
     panic(err)
 }
}

このコードを分割してメイン ルーチンの外に移動するとすぐに、次のエラーが発生します。

kafka: クライアントが通信可能なブローカーを使い果たしました (クラスターに到達できますか?)

コードを次のように分割しました。以前の main() メソッドは、NewConsumer() というメソッドを持つコンシューマ パッケージに変換し、新しい main() は次のように NewConsumer() を呼び出します。

c := consumer.NewConsumer()

パニックステートメントが次の行でトリガーされ、sarama.NewConsumer出力されますkafka: client has run out of available brokers to talk to (Is your cluster reachable?)

このようにコードを分割すると、Sarama がコンシューマを作成できなくなるのはなぜですか? Sarama はメインから直接実行する必要がありますか?

4

1 に答える 1

3

この方法で、1 つのグループにグループ化される 2 つ以上のコンシューマーを作成すると思います(おそらくgo-kafka-consumer)。ブローカーには 1 つのパーティションを持つトピックがあるため、グループの 1 つが割り当てられ、もう 1 つがこのエラー メッセージを生成します。そのトピックのパーティションを 2 に上げると、エラーはなくなります。しかし、あなたの問題は、どういうわけか以前よりも多くの消費者をインスタンス化したことだと思います。

一言で言えばカフカから:

コンシューマーは、特定のトピックのコンシューマー グループに編成することもできます。グループ内の各コンシューマーは一意のパーティションから読み取り、グループ全体でトピック全体からすべてのメッセージを消費します。パーティションよりも多くのコンシューマーがある場合、一部のコンシューマーは読み取るパーティションがないためアイドル状態になります。コンシューマーよりも多くのパーティションがある場合、コンシューマーは複数のパーティションからメッセージを受け取ります。コンシューマーとパーティションの数が等しい場合、各コンシューマーは 1 つのパーティションから順番にメッセージを読み取ります。

それらは正確にエラーを生成しないため、Sarama の問題になります。

于 2019-04-26T13:08:49.910 に答える