dask を使用して、メモリに収まらないデータセットを処理しようとしています。さまざまな「ID」の時系列データです。dask のドキュメントを読んだ後、「寄木細工」ファイル形式と「ID」によるパーティション分割を使用することにしました。
ただし、寄木細工から読み取ってインデックスを設定しているときに、「TypeError: 順序付けられたカテゴリを結合するには、すべてのカテゴリが同じでなければならない」というエラーが発生しましたが、これは自分で解決できませんでした。
このコードは、私が抱えている問題を再現しています:
import dask.dataframe as dd
import numpy as np
import pandas as pd
import traceback
# create ids
ids = ["AAA", "BBB", "CCC", "DDD"]
# create data
df = pd.DataFrame(index=np.random.choice(ids, 50), data=np.random.rand(50, 1), columns=["FOO"]).reset_index().rename(columns={"index": "ID"})
# serialize to parquet
f = r"C:/temp/foo.pq"
df.to_parquet(f, compression='gzip', engine='fastparquet', partition_cols=["ID"])
# read with dask
df = dd.read_parquet(f)
try:
df = df.set_index("ID")
except Exception as ee:
print(traceback.format_exc())
この時点で、次のエラーが表示されます。
~\.conda\envs\env_dask_py37\lib\site-packages\pandas\core\arrays\categorical.py in check_for_ordered(self, op)
1492 if not self.ordered:
1493 raise TypeError(
-> 1494 f"Categorical is not ordered for operation {op}\n"
1495 "you can use .as_ordered() to change the "
1496 "Categorical to an ordered one\n"
TypeError: Categorical is not ordered for operation max
you can use .as_ordered() to change the Categorical to an ordered one
私はそれからしました:
# we order the categorical
df.ID = df.ID.cat.as_ordered()
df = df.set_index("ID")
そして、 を使用しようとするとdf.compute(scheduler="processes")
、前に述べた TypeError が発生します。
try:
schd_str = 'processes'
aa = df.compute(scheduler=schd_str)
print(f"{schd_str}: OK")
except:
print(f"{schd_str}: KO")
print(traceback.format_exc())
与えます:
Traceback (most recent call last):
File "<ipython-input-6-e15c4e86fee2>", line 3, in <module>
aa = df.compute(scheduler=schd_str)
File "C:\Users\xxx\.conda\envs\env_dask_py37\lib\site-packages\dask\base.py", line 166, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "C:\Users\xxx\.conda\envs\env_dask_py37\lib\site-packages\dask\base.py", line 438, in compute
return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File "C:\Users\xxx\.conda\envs\env_dask_py37\lib\site-packages\dask\base.py", line 438, in <listcomp>
return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File "C:\Users\xxx\.conda\envs\env_dask_py37\lib\site-packages\dask\dataframe\core.py", line 103, in finalize
return _concat(results)
File "C:\Users\xxx\.conda\envs\env_dask_py37\lib\site-packages\dask\dataframe\core.py", line 98, in _concat
else methods.concat(args2, uniform=True, ignore_index=ignore_index)
File "C:\Users\xxx\.conda\envs\env_dask_py37\lib\site-packages\dask\dataframe\methods.py", line 383, in concat
ignore_index=ignore_index,
File "C:\Users\xxx\.conda\envs\env_dask_py37\lib\site-packages\dask\dataframe\methods.py", line 431, in concat_pandas
ind = concat([df.index for df in dfs])
File "C:\Users\xxx\.conda\envs\env_dask_py37\lib\site-packages\dask\dataframe\methods.py", line 383, in concat
ignore_index=ignore_index,
File "C:\Users\xxx\.conda\envs\env_dask_py37\lib\site-packages\dask\dataframe\methods.py", line 400, in concat_pandas
return pd.CategoricalIndex(union_categoricals(dfs), name=dfs[0].name)
File "C:\Users\xxx\.conda\envs\env_dask_py37\lib\site-packages\pandas\core\dtypes\concat.py", line 352, in union_categoricals
raise TypeError("Categorical.ordered must be the same")
TypeError: Categorical.ordered must be the same
驚くべきことに、 、 を使用するかdf.compute(scheduler="threads")
、df.compute(scheduler="synchronous")
またはインデックスをまったく設定しないと、適切に機能します。
ただし、これらのデータセットのいくつかを実際にマージしようとしており、インデックスを設定すると何も設定しないよりも高速になると考えていたため、これは私がすべきことではないようです。(この方法でインデックス付けされた 2 つのデータフレームをマージしようとすると、まったく同じエラーが発生します)
df._meta を検査しようとしましたが、私のカテゴリが「既知」であることがわかりました。タスクカテゴリ
私はまた、似ているように見えるが、どういうわけか解決策を見つけられなかったということについて、このgithubの投稿を読みました。
ご協力いただきありがとうございます、