unique
マスクしてから、1 つの列に操作を適用しようとしています。私が使用しているコードの簡略版を以下に報告します。
import numpy as np
import pandas as pd
import dask.dataframe as dd
data = np.random.randint(0,100,(1000,2))
ddf = dd.from_pandas(pd.DataFrame(data, columns = ['data','id']), npartitions = 2)
mask = ddf['data'] > 0
unique_false = ddf[~mask]['id'].unique()
unique_true = ddf[mask]['id'].unique()
results = dask.compute([unique_true, unique_false])
この簡単な例は問題なく動作します。私の実際のデータは~5000
列で構成されており、1 つの列はフィルター処理に使用され、もう 1 つの列は一意の ID を取得するために使用されます。データは200
寄木細工のパーティションに保存され、これらのパーティションのそれぞれの重みは 9MB ですが、メモリにロードされると ( ddf.get_partition(0).compute().info()
) weights ~5GB
. 私はRAMを持っているので、パーティション400GB
をロードできると思います80
(おそらく他の操作のオーバーヘッドを考えると少ないでしょう)。ダッシュボードから、dask が一度にすべてのタスクを実行しようとしていることがわかります (メモリ内のタスクは常に同じで、ワーカーの数は関係ありません)。
パーティションの処理にかかる時間をテストするためにこれを書きました。
start = time.time()
df = ddf.get_partition(0).compute()
mask = df['data'] > 0
unique_true = df[mask]['id'].unique()
unique_false = df[~mask]['id'].unique()
print(time.time() - start)
それは周りにかかり、RAMの60s
周りを必要とします。7GB
ProcessPool を開始50
し、一度にパーティションのみを実行していると仮定すると、4-5
数分かかります。
Dask のコアが単一のパーティションで行ったこととまったく同じことを知っているので、私の質問は、なぜ Dask が一度に 1 つずつではなく、すべてのタスクを並行して実行しようとするのかということです。タスクの実行を制限する方法はありますか? そして、これはここでの本当の問題ですか、それとも何か不足していますか?
タスクの実行を制限するために、ここでいくつかの質問を見つけました。ここのすべてのポイント: https://distributed.dask.org/en/latest/resources.html . ただし、この動作を強制するのではなく、Dask に最善を尽くさせるべきだと思います。また、それぞれ 80 GB の RAM を使用してシングル スレッドで 5 つのワーカーを設定すると、Dask がコードを実行できることにも言及する必要があります (ただし、前述のプロセス プール メソッドでかかる時間よりもはるかに多くの時間がかかります)。
私は python3.6.10
と Daskを使用して2.17.2
います。