5

ウィキペディアの CirrusSearch ダンプを、450G 16 コア GCP インスタンスのタイトルでインデックス付けされた Parquet でサポートされた dask データフレームに変換しようとしています。CirrusSearch ダンプは、単一の json 行形式のファイルとして提供されます。英語の Wipedia ダンプには 5M のリカードが含まれており、12G で圧縮され、90+G で拡張されています。重要な詳細は、レコードが完全にフラットではないということです。

これを行う最も簡単な方法は次のとおりです。

import json
import dask
from  dask import bag as db, dataframe as ddf
from  toolz import curried as tz
from toolz.curried import operator as op

blocksize=2**24
npartitions='auto'
parquetopts=dict(engine='fastparquet', object_encoding='json')

lang = 'en'
wiki = 'wiki'
date = 20180625
path='./'

source = f'{path}{lang}{wiki}-{date}-cirrussearch-content.json'

(
 db
 .read_text(source, blocksize=blocksize)
 .map(json.loads)
 .filter(tz.flip(op.contains, 'title'))
 .to_dataframe()
 .set_index('title', npartitions=npartitions)
 .to_parquet(f'{lang}{wiki}-{date}-cirrussearch.pq', **parquetopts)
)

最初の問題は、デフォルトのスケジューラーでは 1 つのコアしか使用しないことです。この問題は、分散スケジューラまたはマルチプロセッシング スケジューラを明示的に使用することで回避できます。

私が試したすべてのスケジューラと設定のより大きな問題は、メモリ使用量です。インデックス作成時に、dask がデータフレーム全体をメモリにロードしようとしているようです。これには450Gでも十分なRAMではありません。

  • このタスクのメモリ使用量を減らすにはどうすればよいですか?
  • 試行錯誤をせずに必要な最小メモリを見積もるにはどうすればよいですか?
  • より良いアプローチはありますか?
4

1 に答える 1

4

Dask が 1 つのコアしか使用していないのはなぜですか?

これの JSON 解析部分はおそらく GIL に依存しているため、プロセスを使用する必要があります。ただし、最終的に何かを計算するときは、データフレームを使用しています。これは通常、計算によって GIL が解放されると想定しているため (これは Pandas では一般的です)、デフォルトでスレッド バックエンドを使用します。ほとんどが GIL 解析ステージに縛られている場合は、おそらくマルチプロセッシング スケジューラを使用することをお勧めします。これで問題が解決するはずです:

dask.config.set(scheduler='multiprocessing')

set_index フェーズ中のメモリ使用を回避するにはどうすればよいですか

はい、set_index の計算には完全なデータセットが必要です。これは難しい問題です。単一マシンのスケジューラを使用している場合 (実行しているように見えます)、このソート プロセスを実行するには、コア外のデータ構造を使用する必要があります。メモリが不足していることに驚いています。

試行錯誤をせずに必要な最小メモリを見積もるにはどうすればよいですか?

残念ながら、どの言語でもメモリ内の JSON に似たデータのサイズを見積もることは困難です。これは、フラット スキーマを使用するとはるかに簡単です。

より良いアプローチはありますか?

これはあなたの中心的な問題を解決するものではありませんが、すべてをソートする前に、データを Parquet 形式でステージングすることを検討してください。次にdd.read_parquet(...).set_index(...).to_parquet(...)、単独で実行してみてください。これは、一部のコストを分離するのに役立つ場合があります。

于 2018-06-29T11:34:23.517 に答える