Kafka Connect BigQuery Connector を Strimzi Operator for Kubernetes と共に使用すると、次のエラーが発生します。
com.google.cloud.bigquery.BigQueryException: Error getting access token for service account
...
java.io.IOException: Error getting access token for service account: oauth2.googleapis.com
...
java.net.UnknownHostException: oauth2.googleapis.com
これを引き起こす可能性のあるアイデアはありますか?これを企業プロキシの背後で実行しますが、環境変数として追加しました。
また、Kafka Connect クラスター ポッド内で curl oauth2.googleapis.com を実行すると、コンテンツにエラー 404 (Not Found) を含む HTML 応答が返されます。
私の Kubernetes セットアップは次のようになります。
カフカコネクト:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect-cluster
namespace: message-bus
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.0.0
resources:
requests:
cpu: 12
memory: 64Gi
limits:
cpu: 12
memory: 64Gi
replicas: 1
bootstrapServers: kafka-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: kafka-cluster-cluster-ca-cert
certificate: ca.crt
image: docker.io/alexanghel23/kafka-connect-plugins:v0.2.0
template:
connectContainer:
env:
- name: https_proxy
value: http://XXX.XXX.XX.XXX:3128
- name: http_proxy
value: http://XXX.XXX.XX.XXX:3128
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /opt/kafka/external-configuration/gcp-credentials/kafka-bq.json
externalConfiguration:
volumes:
- name: gcp-credentials
secret:
secretName: kafka-bq
config:
config.providers: env
config.providers.env.class: io.strimzi.kafka.EnvVarConfigProvider
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter.schemas.enable: false
カフカコネクタ:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: alerts-bq
namespace: message-bus
labels:
strimzi.io/cluster: kafka-connect-cluster
spec:
class: com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasksMax: 1
config:
project: ai4neo-dev
defaultDataset: ".*=test_kafka"
topics: alerts
keySource: FILE
keyfile: "/opt/kafka/external-configuration/gcp-credentials/kafka-bq.json"
proxy.url: "http://xxx.xxx.xx.xxx:3128"
サービス アカウント:
{
"type": "service_account",
"project_id": "ai4neo-dev",
"private_key_id": "81<omitted>1e",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIEv<omitted>qpefw=\n-----END PRIVATE KEY-----\n",
"client_email": "kafka-bq@ai4neo-dev.iam.gserviceaccount.com",
"client_id": "10<omitted>21",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/kafka-bq%40ai4neo-dev.iam.gserviceaccount.com"
}
完全なエラー:
2021-12-29 10:48:49,051 ERROR WorkerSinkTask{id=alerts-bq-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Error getting access token for service account: oauth2.googleapis.com (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-alerts-bq-0]
com.google.cloud.bigquery.BigQueryException: Error getting access token for service account: oauth2.googleapis.com
at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:113)
at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getTable(HttpBigQueryRpc.java:285)
at com.google.cloud.bigquery.BigQueryImpl$17.call(BigQueryImpl.java:678)
at com.google.cloud.bigquery.BigQueryImpl$17.call(BigQueryImpl.java:675)
at com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:105)
at com.google.cloud.RetryHelper.run(RetryHelper.java:76)
at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
at com.google.cloud.bigquery.BigQueryImpl.getTable(BigQueryImpl.java:674)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.lambda$retrieveCachedTable$2(BigQuerySinkTask.java:338)
at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1133)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.retrieveCachedTable(BigQuerySinkTask.java:338)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.getRecordTable(BigQuerySinkTask.java:210)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:245)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Error getting access token for service account: oauth2.googleapis.com
at com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:444)
at com.google.auth.oauth2.OAuth2Credentials.refresh(OAuth2Credentials.java:157)
at com.google.auth.oauth2.OAuth2Credentials.getRequestMetadata(OAuth2Credentials.java:145)
at com.google.auth.oauth2.ServiceAccountCredentials.getRequestMetadata(ServiceAccountCredentials.java:603)
at com.google.auth.http.HttpCredentialsAdapter.initialize(HttpCredentialsAdapter.java:91)
at com.google.cloud.http.HttpTransportOptions$1.initialize(HttpTransportOptions.java:159)
at com.google.api.client.http.HttpRequestFactory.buildRequest(HttpRequestFactory.java:88)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.buildHttpRequest(AbstractGoogleClientRequest.java:422)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:541)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:474)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591)
at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getTable(HttpBigQueryRpc.java:283)
... 22 more
Caused by: java.net.UnknownHostException: oauth2.googleapis.com
at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:220)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.base/java.net.Socket.connect(Socket.java:609)
at java.base/sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:299)
at java.base/sun.net.NetworkClient.doConnect(NetworkClient.java:177)
at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:474)
at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:569)
at java.base/sun.net.www.protocol.https.HttpsClient.<init>(HttpsClient.java:266)
at java.base/sun.net.www.protocol.https.HttpsClient.New(HttpsClient.java:373)
at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.getNewHttpClient(AbstractDelegateHttpsURLConnection.java:203)
at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1187)
at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1081)
at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:189)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1367)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1342)
at java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.getOutputStream(HttpsURLConnectionImpl.java:246)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:113)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1012)
at com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:441)
ありがとうございました!