1

関数のような df.apply を実装しようとしていますが、データフレームのチャンク全体で並列化されています。次のテストコードを書いて、(データのコピーなどと比較して)どれだけ得られるかを確認しました。

from multiprocessing import Pool
from functools import partial
import pandas as pd
import numpy as np
import time

def df_apply(df, f):
    return df.apply(f, axis=1)

def apply_in_parallel(df, f, n=5):
    pool = Pool(n)
    df_chunks = np.array_split(df, n)
    apply_f = partial(df_apply, f=f)
    result_list = pool.map(apply_f, df_chunks)
    return pd.concat(result_list, axis=0)

def f(x):
  return x+1

if __name__ == '__main__':
  N = 10^8
  df = pd.DataFrame({"a": np.zeros(N), "b": np.zeros(N)})

  print "parallel"
  t0 = time.time()
  r = apply_in_parallel(df, f, n=5)
  print time.time() - t0

  print "single"
  t0 = time.time()
  r = df.apply(f, axis=1)
  print time.time() - t0

奇妙な動作: N=10^7 の場合、N=10^8 で機能し、エラーが発生します

Traceback (most recent call last):
  File "parallel_apply.py", line 27, in <module>
    r = apply_in_parallel(df, f, n=5)
  File "parallel_apply.py", line 14, in apply_in_parallel
    result_list = pool.map(apply_f, df_chunks)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 227, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 528, in get
    raise self._value
AttributeError: 'numpy.ndarray' object has no attribute 'apply'

ここで何が起こっているか知っている人はいますか?また、この並列化の方法に関するフィードバックをいただければ幸いです。個々の行と数百万行ごとに inc または sum よりも時間がかかる関数を期待しています。

ありがとう!

4

2 に答える 2