2

TLDR;

すべてのポートが開いている同じローカルネットにあったため、Kubernetes で 2 つのポッドを接続することは可能ですか?

動機

現在、Kubernetes クラスターにエアフローを実装しており、TensorFlow Extended を使用することを目指して、Apache ビームを使用する必要があります。私たちのユース ケースでは、Spark が適切なランナーとして使用されます。エアフローと TensorFlow は Python でコーディングされているため、Apache Beam のポータブル ランナー ( https://beam.apache.org/documentation/runners/spark/ #携帯性)。

問題

エアフロー ポッドとジョブ サーバー ポッド間の通信で、送信エラーが発生しています (ジョブ サーバーによって使用されるランダムなポートが原因である可能性があります)。

設定

優れた分離プラクティスに従い、Kubernetes 共通セットアップで Spark を模倣する (ポッド内のクラスター内のドライバーを使用する) ために、ジョブ サーバーは次のように実装されました。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: beam-spark-job-server
  labels:
    app: airflow-k8s
spec:
  selector:
    matchLabels:
      app: beam-spark-job-server
  replicas: 1
  template:
    metadata:
      labels:
        app: beam-spark-job-server
    spec:
      restartPolicy: Always
      containers:
        - name: beam-spark-job-server
          image: apache/beam_spark_job_server:2.27.0
          args: ["--spark-master-url=spark://spark-master:7077"]
          resources:
            limits:
              memory: "1Gi"
              cpu: "0.7"
          env:
            - name: SPARK_PUBLIC_DNS
              value: spark-client
          ports:
          - containerPort: 8099
            protocol: TCP
            name: job-server
          - containerPort: 7077
            protocol: TCP
            name: spark-master
          - containerPort: 8098
            protocol: TCP
            name: artifact
          - containerPort: 8097
            protocol: TCP
            name: java-expansion
apiVersion: v1
kind: Service
metadata:
  name: beam-spark-job-server
  labels:
    app: airflow-k8s
spec:
  type: ClusterIP
  selector:
    app: beam-spark-job-server
  ports:
    - port: 8099
      protocol: TCP
      targetPort: 8099
      name: job-server
    - port: 7077
      protocol: TCP
      targetPort: 7077
      name: spark-master
    - port: 8098
      protocol: TCP
      targetPort: 8098
      name: artifact
    - port: 8097
      protocol: TCP
      targetPort: 8097
      name: java-expansion

開発/エラー

python -m apache_beam.examples.wordcount --output ./data_test/ --runner=PortableRunner --job_endpoint=beam-spark-job-server:8099 --environment_type=LOOPBACKエアフロー ポッドからコマンドを実行すると、ジョブ サーバーにログが記録されず、ターミナルに次のエラーが表示されます。

INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.client:Timeout attempting to reach GCE metadata service.
WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to use: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information.
Connecting anonymously.
INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at localhost:46569
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:root:Default Python SDK image for environment is apache/beam_python3.7_sdk:2.27.0
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/examples/wordcount.py", line 99, in <module>
    run()
  File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/examples/wordcount.py", line 94, in run
ERROR:grpc._channel:Exception iterating requests!
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/grpc/_channel.py", line 195, in consume_request_iterator
    request = next(request_iterator)
  File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/artifact_service.py", line 355, in __next__
    raise self._queue.get()
  File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 561, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 421, in run_pipeline
    job_service_handle.submit(proto_pipeline)
  File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 115, in submit
    prepare_response.staging_session_token)
  File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 214, in stage
    staging_session_token)
  File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/artifact_service.py", line 241, in offer_artifacts
    for request in requests:
  File "/home/airflow/.local/lib/python3.7/site-packages/grpc/_channel.py", line 416, in __next__
    return self._next()
  File "/home/airflow/.local/lib/python3.7/site-packages/grpc/_channel.py", line 803, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.INVALID_ARGUMENT
    details = "Unknown staging token job_b6f49cc2-6732-4ea3-9aef-774e3d22867b"
    debug_error_string = "{"created":"@1613765341.075846957","description":"Error received from peer ipv4:127.0.0.1:8098","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Unknown staging token job_b6f49cc2-6732-4ea3-9aef-774e3d22867b","grpc_status":3}"
>
    output | 'Write' >> WriteToText(known_args.output)
  File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 582, in __exit__
    self.result = self.run()
  File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 561, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 421, in run_pipeline
    job_service_handle.submit(proto_pipeline)
  File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 115, in submit
    prepare_response.staging_session_token)
  File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 214, in stage
    staging_session_token)
  File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/artifact_service.py", line 241, in offer_artifacts
    for request in requests:
  File "/home/airflow/.local/lib/python3.7/site-packages/grpc/_channel.py", line 416, in __next__
    return self._next()
  File "/home/airflow/.local/lib/python3.7/site-packages/grpc/_channel.py", line 803, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.INVALID_ARGUMENT
    details = "Unknown staging token job_b6f49cc2-6732-4ea3-9aef-774e3d22867b"
    debug_error_string = "{"created":"@1613765341.075846957","description":"Error received from peer ipv4:127.0.0.1:8098","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Unknown staging token job_b6f49cc2-6732-4ea3-9aef-774e3d22867b","grpc_status":3}"

これは、ジョブの送信中にエラーが発生したことを示しています。エアフローと同じポッドに Job Server を実装すると、これら 2 つのコンテナ間で完全に機能する通信が得られます。同じ動作をさせたいのですが、異なるポッドでそれらを使用したいと考えています。

4

1 に答える 1