9

parquet 形式のやや大きな (〜 20 GB) パーティション分割されたデータセットがあります。を使用して、データセットから特定のパーティションを読み取りたいと思いpyarrowます。でこれを達成できると思ってpyarrow.parquet.ParquetDatasetいましたが、そうではないようです。ここに私が欲しいものを説明するための小さな例があります。

ランダム データセットを作成するには:

from collections import OrderedDict
from itertools import product, chain
from uuid import uuid4
import os
from glob import glob

import numpy as np
import pandas as pd
import pyarrow as pa
from pyarrow.parquet import ParquetWriter, ParquetDataset


def get_partitions(basepath, partitions):
    """Generate directory hierarchy for a paritioned dataset

    data
    ├── part1=foo
    │   └── part2=True
    ├── part1=foo
    │   └── part2=False
    ├── part1=bar
    │   └── part2=True
    └── part1=bar
        └── part2=False

    """
    path_tmpl = '/'.join(['{}={}'] * len(partitions))  # part=value
    path_tmpl = '{}/{}'.format(basepath, path_tmpl)    # part1=val/part2=val

    parts = [product([part], vals) for part, vals in partitions.items()]
    parts = [i for i in product(*parts)]
    return [path_tmpl.format(*tuple(chain.from_iterable(i))) for i in parts]


partitions = OrderedDict(part1=['foo', 'bar'], part2=[True, False])
parts = get_partitions('data', partitions)
for part in parts:
    # 3 columns, 5 rows
    data = [pa.array(np.random.rand(5)) for i in range(3)]
    table = pa.Table.from_arrays(data, ['a', 'b', 'c'])
    os.makedirs(part, exist_ok=True)
    out = ParquetWriter('{}/{}.parquet'.format(part, uuid4()),
                        table.schema, flavor='spark')
    out.write_table(table)
    out.close()

パーティション 1 のすべての値を読み取り、パーティション 2 の True のみを読み取りたいです。これpandas.read_parquetは不可能であり、常に列全体を読み取る必要があります。私は次のことを試しましたpyarrow

parts2 = OrderedDict(part1=['foo', 'bar'], part2=[True])
parts2 = get_partitions('data', parts2)
files = [glob('{}/*'.format(dirpath)) for dirpath in parts2]
files = [i for i in chain.from_iterable(files)]
df2 = ParquetDataset(files).read().to_pandas()

それもうまくいきません:

>>> df2.columns
Index(['a', 'b', 'c'], dtype='object')

私はこれを次のpysparkように簡単に行うことができます:

def get_spark_session_ctx(appName):
    """Get or create a Spark Session, and the underlying Context."""
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName(appName).getOrCreate()
    sc = spark.sparkContext
    return (spark, sc)


spark, sc = get_spark_session_ctx('test')
spark_df = spark.read.option('basePath', 'data').parquet(*parts2)
df3 = spark_df.toPandas()

以下に示すように:

>>> df3.columns
Index(['a', 'b', 'c', 'part1', 'part2'], dtype='object')

または でこれを行うことができますpyarrowpandas、それともカスタム実装が必要ですか?

更新: Wes のリクエストにより、これは現在JIRAにあります。

4

2 に答える 2