0

kafka1.1.0 のトピックを管理するために sarama(1.27) ClusterAdmin を使用しています。kafka トピックを管理する私のアプリケーションは、REST サービスとして実行されています。私のアプリケーションはしばらくの間正常に動作し、トピックを取得/作成/削除できます。

しかし、アクティビティがない状態でしばらく時間が経過すると、新しいトピック リクエストでエラーが発生します。

このHow to fix broker may not be available after broken pipe に遭遇しました。

私のアプリケーションはサービスとして実行されているため、パイプの破損の問題を防ぐにはどうすればよいですか? アプリケーションが終了するときだけ、ClusterAdmin を閉じます。すべての要求を処理するために、同じ ClusterAdmin 接続が使用されます。何らかの理由で nil の場合は、リクエストごとに clusterAdmin を再初期化します (通常、最初の初期化後は nil ではないため、同じ接続が再利用されます)。

各リクエストが処理された後に clusteradmin を閉じて、各トピック リクエストに対して NewClusterAdmin() を開く必要がありますか、それともキープアライブ オプションを使用する必要がありますか?

これが私の既存のコードです:

if admin == nil{
        admin, err := NewClusterAdmin([]string{"localhost:9092"}, s.config)
        ..
}
topicMetadata, err := admin.DescribeTopics([]string{topicName})
4

1 に答える 1