2

列で並べ替えようとしている大きな一連の Parquet ファイルがあります。非圧縮のデータは約 14Gb であるため、Dask はこの仕事に適したツールのように思えました。私が Dask で行っていることは次のとおりです。

  1. 寄木細工ファイルの読み取り
  2. 列の 1 つ (「友人」と呼ばれる) での並べ替え
  3. 別のディレクトリに寄木細工のファイルとして書き込む

Daskプロセス(同期スケジューラを使用しているのは1つだけです)がメモリ不足になり、強制終了されなければ、これを行うことはできません。圧縮されていないパーティションが最大 300 MB を超えることはないため、これには驚かされます。

データセットの部分が徐々に大きくなるように Dask をプロファイリングするための小さなスクリプトを作成しましたが、Dask のメモリ消費量が入力のサイズに比例することに気付きました。スクリプトは次のとおりです。

import os
import dask
import dask.dataframe as dd
from dask.diagnostics import ResourceProfiler, ProgressBar

def run(input_path, output_path, input_limit):
    dask.config.set(scheduler="synchronous")

    filenames = os.listdir(input_path)
    full_filenames = [os.path.join(input_path, f) for f in filenames]

    rprof = ResourceProfiler()
    with rprof, ProgressBar():
        df = dd.read_parquet(full_filenames[:input_limit])
        df = df.set_index("friend")
        df.to_parquet(output_path)

    rprof.visualize(file_path=f"profiles/input-limit-{input_limit}.html")

visualize()呼び出しによって作成されたチャートは次のとおりです。

入力制限 = 2

input_limit=2 の場合

入力制限 = 4

input_limit=4 の場合

入力制限 = 8

input_limit=8 の場合

入力制限 = 16

input_limit=16 の場合

完全なデータセットは最大 50 個の入力ファイルなので、この増加率では、ジョブが 32 GB マシンのすべてのメモリを使い果たしてしまうことに驚きません。

私の理解では、Dask の要点は、メモリよりも大きなデータセットを操作できるようにすることです。人々が Dask を使用して、私の ~14 GB のものよりもはるかに大きなデータセットを処理しているという印象を受けます。メモリ消費量のスケーリングでこの問題を回避するにはどうすればよいでしょうか? ここで何が間違っていますか?

現時点では、別のスケジューラーや並列処理を使用することに興味はありません。Dask が必要以上に多くのメモリを消費している理由を知りたいだけです。

4

1 に答える 1