単純な KubeFlow パイプラインをセットアップしようとしていますが、KubeFlow で機能する方法で依存関係をパッケージ化するのに問題があります。
コードは単純に構成ファイルをダウンロードして解析し、解析された構成を返します。
ただし、構成ファイルを解析するには、別の内部 python パッケージにアクセスできる必要があります。
同じプロジェクトのバケットでホストされているパッケージの.tar.gz
アーカイブがあり、パッケージの URL を依存関係として追加しましたが、というエラー メッセージが表示されますtarfile.ReadError: not a gzip file
。
ファイルが適切であることはわかっているので、バケットでのホスティングまたは kubeflow が依存関係をインストールする方法に関する中間的な問題です。
最小限の例を次に示します。
from kfp import compiler
from kfp import dsl
from kfp.components import func_to_container_op
from google.protobuf import text_format
from google.cloud import storage
import training_reader
def get_training_config(working_bucket: str,
working_directoy: str,
config_file: str) -> training_reader.TrainEvalPipelineConfig:
download_file(working_bucket, os.path.join(working_directoy, config_file), "ssd.config")
pipeline_config = training_reader.TrainEvalPipelineConfig()
with open("ssd.config", 'r') as f:
text_format.Merge(f.read(), pipeline_config)
return pipeline_config
config_op_packages = ["https://storage.cloud.google.com/my_bucket/packages/training-reader-0.1.tar.gz",
"google-cloud-storage",
"protobuf"
]
training_config_op = func_to_container_op(get_training_config,
base_image="tensorflow/tensorflow:1.15.2-py3",
packages_to_install=config_op_packages)
def output_config(config: training_reader.TrainEvalPipelineConfig) -> None:
print(config)
output_config_op = func_to_container_op(output_config)
@dsl.pipeline(
name='Post Training Processing',
description='Building the post-processing pipeline'
)
def ssd_postprocessing_pipeline(
working_bucket: str,
working_directory: str,
config_file:str):
config = training_config_op(working_bucket, working_directory, config_file)
output_config_op(config.output)
pipeline_name = ssd_postprocessing_pipeline.__name__ + '.zip'
compiler.Compiler().compile(ssd_postprocessing_pipeline, pipeline_name)