dask-ml によって提供されるクラスタリング アルゴリズムでダウンストリームで使用される Dask データフレームを生成しています。パイプラインの前のステップで、 を使用してディスクからデータフレームを読み取り、 を使用しdask.dataframe.read_parquet
て変換を適用して列を追加しmap_partitions
、 を使用して結果のデータフレームをディスクに書き戻しdask.dataframe.to_parquet
ます。この問題は、結果のデータフレームが再度読み取られてcompute()
呼び出されるときに発生します。
次のコードを実行します。
# First step: make the Dask dataframe
ddf = ddf.map_partitions(partition_func) # in this case, perform a df.apply, then pandas.concat with the original
ddf_output_path = pathlib.Path("./data/") # Some directory
ddf_output_path.mkdir(parents=True, exist_ok=True)
dask.dataframe.to_parquet(ddf, ddf_output_path) # Succeeds
# Second step: attempt to read and compute on the Dask dataframe
ddf = dask.dataframe.read_parquet(ddf_output_path)
print(ddf.columns) # Produces the correct output
print(ddf.shape[0].compute()) # <-- fails here, for example
次のトレースバックを生成します。
File "/home/ec2-user/pycharm_remote/pipeline/perform_clustering.py", line 32, in run
print(ddf.shape[0].compute())
File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/dask/base.py", line 167, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/dask/base.py", line 452, in compute
results = schedule(dsk, keys, **kwargs)
File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/dask/threaded.py", line 84, in get
**kwargs
File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/dask/local.py", line 486, in get_async
raise_exception(exc, tb)
File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/dask/local.py", line 316, in reraise
raise exc
File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/dask/local.py", line 222, in execute_task
result = _execute_task(task, data)
File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/dask/core.py", line 121, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 276, in read_parquet_part
dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 276, in <listcomp>
dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py", line 758, in read_partition
piece, columns, partitions, **kwargs
File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py", line 817, in _parquet_piece_as_arrow
**kwargs.get("read", {}),
File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/pyarrow/parquet.py", line 719, in read
table = reader.read_row_group(self.row_group, **options)
File "/home/ec2-user/project/virtualenv/lib64/python3.7/site-packages/pyarrow/parquet.py", line 272, in read_row_group
use_threads=use_threads)
File "pyarrow/_parquet.pyx", line 1080, in pyarrow._parquet.ParquetReader.read_row_group
File "pyarrow/_parquet.pyx", line 1099, in pyarrow._parquet.ParquetReader.read_row_groups
File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status
OSError: Couldn't deserialize thrift: TProtocolException: Invalid data
Deserializing page header failed.
環境は Amazon Linux 2、Python 3.7.9、dask == 2.30.0、pyarrow == 2.0.0、pandas == 1.1.5、numpy == 1.19.4 です。dask データフレームは 404 列で構成され、約 14,000 の寄木細工のファイル (パーティション) から読み取られます。列のうち 4 つには type の項目が含まれobject
(3 つには文字列が保持され、1 つには文字列のネストされたリストが保持されます)、残りの 400 には type が含まれますfloat64
。