kafka1.1.0 のトピックを管理するために sarama(1.27) ClusterAdmin を使用しています。kafka トピックを管理する私のアプリケーションは、REST サービスとして実行されています。私のアプリケーションはしばらくの間正常に動作し、トピックを取得/作成/削除できます。
しかし、アクティビティがない状態でしばらく時間が経過すると、新しいトピック リクエストでエラーが発生します。
このHow to fix broker may not be available after broken pipe に遭遇しました。
私のアプリケーションはサービスとして実行されているため、パイプの破損の問題を防ぐにはどうすればよいですか? アプリケーションが終了するときだけ、ClusterAdmin を閉じます。すべての要求を処理するために、同じ ClusterAdmin 接続が使用されます。何らかの理由で nil の場合は、リクエストごとに clusterAdmin を再初期化します (通常、最初の初期化後は nil ではないため、同じ接続が再利用されます)。
各リクエストが処理された後に clusteradmin を閉じて、各トピック リクエストに対して NewClusterAdmin() を開く必要がありますか、それともキープアライブ オプションを使用する必要がありますか?
これが私の既存のコードです:
if admin == nil{
admin, err := NewClusterAdmin([]string{"localhost:9092"}, s.config)
..
}
topicMetadata, err := admin.DescribeTopics([]string{topicName})