ウィキペディアの CirrusSearch ダンプを、450G 16 コア GCP インスタンスのタイトルでインデックス付けされた Parquet でサポートされた dask データフレームに変換しようとしています。CirrusSearch ダンプは、単一の json 行形式のファイルとして提供されます。英語の Wipedia ダンプには 5M のリカードが含まれており、12G で圧縮され、90+G で拡張されています。重要な詳細は、レコードが完全にフラットではないということです。
これを行う最も簡単な方法は次のとおりです。
import json
import dask
from dask import bag as db, dataframe as ddf
from toolz import curried as tz
from toolz.curried import operator as op
blocksize=2**24
npartitions='auto'
parquetopts=dict(engine='fastparquet', object_encoding='json')
lang = 'en'
wiki = 'wiki'
date = 20180625
path='./'
source = f'{path}{lang}{wiki}-{date}-cirrussearch-content.json'
(
db
.read_text(source, blocksize=blocksize)
.map(json.loads)
.filter(tz.flip(op.contains, 'title'))
.to_dataframe()
.set_index('title', npartitions=npartitions)
.to_parquet(f'{lang}{wiki}-{date}-cirrussearch.pq', **parquetopts)
)
最初の問題は、デフォルトのスケジューラーでは 1 つのコアしか使用しないことです。この問題は、分散スケジューラまたはマルチプロセッシング スケジューラを明示的に使用することで回避できます。
私が試したすべてのスケジューラと設定のより大きな問題は、メモリ使用量です。インデックス作成時に、dask がデータフレーム全体をメモリにロードしようとしているようです。これには450Gでも十分なRAMではありません。
- このタスクのメモリ使用量を減らすにはどうすればよいですか?
- 試行錯誤をせずに必要な最小メモリを見積もるにはどうすればよいですか?
- より良いアプローチはありますか?