問題タブ [dask-dataframe]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
187 参照

python - Dask Dataframe で set_index() を使用し、parquet に書き込むとメモリ爆発が発生する

列で並べ替えようとしている大きな一連の Parquet ファイルがあります。非圧縮のデータは約 14Gb であるため、Dask はこの仕事に適したツールのように思えました。私が Dask で行っていることは次のとおりです。

  1. 寄木細工ファイルの読み取り
  2. 列の 1 つ (「友人」と呼ばれる) での並べ替え
  3. 別のディレクトリに寄木細工のファイルとして書き込む

Daskプロセス(同期スケジューラを使用しているのは1つだけです)がメモリ不足になり、強制終了されなければ、これを行うことはできません。圧縮されていないパーティションが最大 300 MB を超えることはないため、これには驚かされます。

データセットの部分が徐々に大きくなるように Dask をプロファイリングするための小さなスクリプトを作成しましたが、Dask のメモリ消費量が入力のサイズに比例することに気付きました。スクリプトは次のとおりです。

visualize()呼び出しによって作成されたチャートは次のとおりです。

入力制限 = 2

input_limit=2 の場合

入力制限 = 4

input_limit=4 の場合

入力制限 = 8

input_limit=8 の場合

入力制限 = 16

input_limit=16 の場合

完全なデータセットは最大 50 個の入力ファイルなので、この増加率では、ジョブが 32 GB マシンのすべてのメモリを使い果たしてしまうことに驚きません。

私の理解では、Dask の要点は、メモリよりも大きなデータセットを操作できるようにすることです。人々が Dask を使用して、私の ~14 GB のものよりもはるかに大きなデータセットを処理しているという印象を受けます。メモリ消費量のスケーリングでこの問題を回避するにはどうすればよいでしょうか? ここで何が間違っていますか?

現時点では、別のスケジューラーや並列処理を使用することに興味はありません。Dask が必要以上に多くのメモリを消費している理由を知りたいだけです。

0 投票する
0 に答える
105 参照

python - Dask - 適切な数のタスクを効率的に実行する方法

uniqueマスクしてから、1 つの列に操作を適用しようとしています。私が使用しているコードの簡略版を以下に報告します。

この簡単な例は問題なく動作します。私の実際のデータは~5000列で構成されており、1 つの列はフィルター処理に使用され、もう 1 つの列は一意の ID を取得するために使用されます。データは200寄木細工のパーティションに保存され、これらのパーティションのそれぞれの重みは 9MB ですが、メモリにロードされると ( ddf.get_partition(0).compute().info()) weights ~5GB. 私はRAMを持っているので、パーティション400GBをロードできると思います80(おそらく他の操作のオーバーヘッドを考えると少ないでしょう)。ダッシュボードから、dask が一度にすべてのタスクを実行しようとしていることがわかります (メモリ内のタスクは常に同じで、ワーカーの数は関係ありません)。

パーティションの処理にかかる時間をテストするためにこれを書きました。

それは周りにかかり、RAMの60s周りを必要とします。7GBProcessPool を開始50し、一度にパーティションのみを実行していると仮定すると、4-5数分かかります。

Dask のコアが単一のパーティションで行ったこととまったく同じことを知っているので、私の質問は、なぜ Dask が一度に 1 つずつではなく、すべてのタスクを並行して実行しようとするのかということです。タスクの実行を制限する方法はありますか? そして、これはここでの本当の問題ですか、それとも何か不足していますか?

タスクの実行を制限するために、ここでいくつかの質問を見つけました。ここのすべてのポイント: https://distributed.dask.org/en/latest/resources.html . ただし、この動作を強制するのではなく、Dask に最善を尽くさせるべきだと思います。また、それぞれ 80 GB の RAM を使用してシングル スレッドで 5 つのワーカーを設定すると、Dask がコードを実行できることにも言及する必要があります (ただし、前述のプロセス プール メソッドでかかる時間よりもはるかに多くの時間がかかります)。

私は python3.6.10と Daskを使用して2.17.2います。

0 投票する
0 に答える
1764 参照

dask - 寄木細工のファイルからの Dask データフレーム: OSError: thrift を逆シリアル化できませんでした: TProtocolException: 無効なデータ

dask-ml によって提供されるクラスタリング アルゴリズムでダウンストリームで使用される Dask データフレームを生成しています。パイプラインの前のステップで、 を使用してディスクからデータフレームを読み取り、 を使用しdask.dataframe.read_parquetて変換を適用して列を追加しmap_partitions、 を使用して結果のデータフレームをディスクに書き戻しdask.dataframe.to_parquetます。この問題は、結果のデータフレームが再度読み取られてcompute()呼び出されるときに発生します。

次のコードを実行します。

次のトレースバックを生成します。

環境は Amazon Linux 2、Python 3.7.9、dask == 2.30.0、pyarrow == 2.0.0、pandas == 1.1.5、numpy == 1.19.4 です。dask データフレームは 404 列で構成され、約 14,000 の寄木細工のファイル (パーティション) から読み取られます。列のうち 4 つには type の項目が含まれobject(3 つには文字列が保持され、1 つには文字列のネストされたリストが保持されます)、残りの 400 には type が含まれますfloat64