TLDR;
すべてのポートが開いている同じローカルネットにあったため、Kubernetes で 2 つのポッドを接続することは可能ですか?
動機
現在、Kubernetes クラスターにエアフローを実装しており、TensorFlow Extended を使用することを目指して、Apache ビームを使用する必要があります。私たちのユース ケースでは、Spark が適切なランナーとして使用されます。エアフローと TensorFlow は Python でコーディングされているため、Apache Beam のポータブル ランナー ( https://beam.apache.org/documentation/runners/spark/ #携帯性)。
問題
エアフロー ポッドとジョブ サーバー ポッド間の通信で、送信エラーが発生しています (ジョブ サーバーによって使用されるランダムなポートが原因である可能性があります)。
設定
優れた分離プラクティスに従い、Kubernetes 共通セットアップで Spark を模倣する (ポッド内のクラスター内のドライバーを使用する) ために、ジョブ サーバーは次のように実装されました。
apiVersion: apps/v1
kind: Deployment
metadata:
name: beam-spark-job-server
labels:
app: airflow-k8s
spec:
selector:
matchLabels:
app: beam-spark-job-server
replicas: 1
template:
metadata:
labels:
app: beam-spark-job-server
spec:
restartPolicy: Always
containers:
- name: beam-spark-job-server
image: apache/beam_spark_job_server:2.27.0
args: ["--spark-master-url=spark://spark-master:7077"]
resources:
limits:
memory: "1Gi"
cpu: "0.7"
env:
- name: SPARK_PUBLIC_DNS
value: spark-client
ports:
- containerPort: 8099
protocol: TCP
name: job-server
- containerPort: 7077
protocol: TCP
name: spark-master
- containerPort: 8098
protocol: TCP
name: artifact
- containerPort: 8097
protocol: TCP
name: java-expansion
apiVersion: v1
kind: Service
metadata:
name: beam-spark-job-server
labels:
app: airflow-k8s
spec:
type: ClusterIP
selector:
app: beam-spark-job-server
ports:
- port: 8099
protocol: TCP
targetPort: 8099
name: job-server
- port: 7077
protocol: TCP
targetPort: 7077
name: spark-master
- port: 8098
protocol: TCP
targetPort: 8098
name: artifact
- port: 8097
protocol: TCP
targetPort: 8097
name: java-expansion
開発/エラー
python -m apache_beam.examples.wordcount --output ./data_test/ --runner=PortableRunner --job_endpoint=beam-spark-job-server:8099 --environment_type=LOOPBACK
エアフロー ポッドからコマンドを実行すると、ジョブ サーバーにログが記録されず、ターミナルに次のエラーが表示されます。
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.client:Timeout attempting to reach GCE metadata service.
WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to use: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information.
Connecting anonymously.
INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at localhost:46569
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:root:Default Python SDK image for environment is apache/beam_python3.7_sdk:2.27.0
Traceback (most recent call last):
File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/examples/wordcount.py", line 99, in <module>
run()
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/examples/wordcount.py", line 94, in run
ERROR:grpc._channel:Exception iterating requests!
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/grpc/_channel.py", line 195, in consume_request_iterator
request = next(request_iterator)
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/artifact_service.py", line 355, in __next__
raise self._queue.get()
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 561, in run
return self.runner.run_pipeline(self, self._options)
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 421, in run_pipeline
job_service_handle.submit(proto_pipeline)
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 115, in submit
prepare_response.staging_session_token)
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 214, in stage
staging_session_token)
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/artifact_service.py", line 241, in offer_artifacts
for request in requests:
File "/home/airflow/.local/lib/python3.7/site-packages/grpc/_channel.py", line 416, in __next__
return self._next()
File "/home/airflow/.local/lib/python3.7/site-packages/grpc/_channel.py", line 803, in _next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.INVALID_ARGUMENT
details = "Unknown staging token job_b6f49cc2-6732-4ea3-9aef-774e3d22867b"
debug_error_string = "{"created":"@1613765341.075846957","description":"Error received from peer ipv4:127.0.0.1:8098","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Unknown staging token job_b6f49cc2-6732-4ea3-9aef-774e3d22867b","grpc_status":3}"
>
output | 'Write' >> WriteToText(known_args.output)
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 582, in __exit__
self.result = self.run()
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 561, in run
return self.runner.run_pipeline(self, self._options)
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 421, in run_pipeline
job_service_handle.submit(proto_pipeline)
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 115, in submit
prepare_response.staging_session_token)
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 214, in stage
staging_session_token)
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/artifact_service.py", line 241, in offer_artifacts
for request in requests:
File "/home/airflow/.local/lib/python3.7/site-packages/grpc/_channel.py", line 416, in __next__
return self._next()
File "/home/airflow/.local/lib/python3.7/site-packages/grpc/_channel.py", line 803, in _next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.INVALID_ARGUMENT
details = "Unknown staging token job_b6f49cc2-6732-4ea3-9aef-774e3d22867b"
debug_error_string = "{"created":"@1613765341.075846957","description":"Error received from peer ipv4:127.0.0.1:8098","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Unknown staging token job_b6f49cc2-6732-4ea3-9aef-774e3d22867b","grpc_status":3}"
これは、ジョブの送信中にエラーが発生したことを示しています。エアフローと同じポッドに Job Server を実装すると、これら 2 つのコンテナ間で完全に機能する通信が得られます。同じ動作をさせたいのですが、異なるポッドでそれらを使用したいと考えています。