49

最近、使いやすい Python 並列処理モジュールを目指したdaskモジュールを見つけました。私にとっての大きなセールスポイントは、パンダで動作することです。

マニュアルページを少し読んだ後、この簡単に並列化可能なタスクを実行する方法が見つかりません。

ts.apply(func) # for pandas series
df.apply(func, axis = 1) # for pandas DF row apply

現時点では、これをダスクで達成するために、AFAIK、

ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame

これは醜い構文であり、実際には完全よりも遅いです

df.apply(func, axis = 1) # for pandas DF row apply

なにか提案を?

編集:マップ機能について@MRocklinに感謝します。普通のパンダが適用するよりも遅いようです。これは pandas GIL リリースの問題に関連していますか、それとも間違っていますか?

import dask.dataframe as dd
s = pd.Series([10000]*120)
ds = dd.from_pandas(s, npartitions = 3)

def slow_func(k):
    A = np.random.normal(size = k) # k = 10000
    s = 0
    for a in A:
        if a > 0:
            s += 1
        else:
            s -= 1
    return s

s.apply(slow_func) # 0.43 sec
ds.map(slow_func).compute() # 2.04 sec
4

2 に答える 2

72

map_partitions

関数を使用して、データフレームのすべてのパーティションに関数を適用できますmap_partitions

df.map_partitions(func, columns=...)

func は一度にデータセットの一部のみを与えられることに注意してください.withのようにデータセット全体ではありpandas applyません.

map/apply

シリーズ全体で関数を行ごとにマップできますmap

df.mycolumn.map(func)

を使用して、データフレーム全体で関数を行ごとにマップできますapply

df.apply(func, axis=1)

スレッドとプロセス

バージョン 0.6.0 以降dask.dataframes、スレッドで並列化されます。カスタム Python 関数は、スレッドベースの並列処理からあまり恩恵を受けません。代わりにプロセスを試すことができます

df = dd.read_csv(...)

df.map_partitions(func, columns=...).compute(scheduler='processes')

しかし避けるapply

ただし、applyPandas と Dask の両方でカスタム Python 関数を使用することは避けるべきです。多くの場合、これがパフォーマンス低下の原因となります。ベクトル化された方法で操作を行う方法を見つけた場合、Pandas コードが 100 倍高速になり、dask.dataframe がまったく必要なくなる可能性があります。

検討numba

特定の問題については、numba. これにより、パフォーマンスが大幅に向上します。

In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: s = pd.Series([10000]*120)

In [4]: %paste
def slow_func(k):
    A = np.random.normal(size = k) # k = 10000
    s = 0
    for a in A:
        if a > 0:
            s += 1
        else:
            s -= 1
    return s
## -- End pasted text --

In [5]: %time _ = s.apply(slow_func)
CPU times: user 345 ms, sys: 3.28 ms, total: 348 ms
Wall time: 347 ms

In [6]: import numba
In [7]: fast_func = numba.jit(slow_func)

In [8]: %time _ = s.apply(fast_func)  # First time incurs compilation overhead
CPU times: user 179 ms, sys: 0 ns, total: 179 ms
Wall time: 175 ms

In [9]: %time _ = s.apply(fast_func)  # Subsequent times are all gain
CPU times: user 68.8 ms, sys: 27 µs, total: 68.8 ms
Wall time: 68.7 ms

免責事項、私は開発者の多くを採用してnumbaいる会社で働いています。daskpandas

于 2015-07-12T03:35:33.483 に答える