2

dask.dataframe.map_partitionsマージなどの操作に使用すると魅力的な場合があります。一部のシナリオでは、 aleft_dfright_dfusingの間でマージを行う場合、マージを実行する前に基本的に事前キャッシュして、ネットワークのオーバーヘッド/ローカル シャッフルを削減しmap_partitionsたいと考えています。right_dfこれを行う明確な方法はありますか?client.scatter(the_df)client.run(func_to_cache_the_df)、またはその他のインテリジェントブロードキャストのいずれかまたは組み合わせで可能になるはずです。

これは、本質的にルックアップ テーブルでleft_dfあるはるかに小さいテーブルで、大きなテーブルで左結合を行うというコンテキストで特に顕著です。right_dfこれはright_dfメモリに読み込まれ、マージ前にすべてのワーカー/パーティションに永続化/分散されて、最後の最後までクロスパーティション通信の必要性を減らすことができるはずです。right_dfこれをうまく行うにはどうすれば分散できますか?

以下は、cuDF と Dask を使用したこの種の不均衡なマージの小さな例です (ただし、概念的には、これは pandas と Dask でも同じです)。

import pandas as pd
import cudf
import dask_cudf
import numpy as np
from dask.distributed import Client
from dask_cuda import LocalCUDACluster

# create a local CUDA cluster
cluster = LocalCUDACluster()
client = Client(cluster)

np.random.seed(12)

nrows_left = 1000000
nrows_right = 1000

left = cudf.DataFrame({'a': np.random.randint(0,nrows_right,nrows_left), 'left_value':np.arange(nrows_left)})
right = cudf.DataFrame({'a': np.arange(nrows_right), 'lookup_val': np.random.randint(0,1000,nrows_right)})

print(left.shape, right.shape) # (1000000, 2) (1000, 2)

ddf_left = dask_cudf.from_cudf(left, npartitions=500)
ddf_right = dask_cudf.from_cudf(right, npartitions=2)

def dask_merge(L, R):
    return L.merge(R, how='left', on='a')

result = ddf_left.map_partitions(dask_merge, R=ddf_right).compute()
result.head()
<cudf.DataFrame ncols=3 nrows=5 >
     a  left_value  lookup_val
0  219        1952         822
1  873        1953         844
2  908        1954         142
3  290        1955         810
4  863        1956         910

4

1 に答える 1