89

非常に大きなDataFrame(混合データ型の) グループに関数を適用する必要があり、複数のコアを利用したいと考えています。

グループから反復子を作成し、マルチプロセッシング モジュールを使用することはできますが、すべてのグループと関数の結果をプロセス間のメッセージングのためにピクルする必要があるため、効率的ではありません。

DataFrameピクルス化を回避したり、完全にコピーを回避したりする方法はありますか? マルチプロセッシングモジュールの共有メモリ機能はnumpy配列に限定されているようです。他のオプションはありますか?

4

1 に答える 1

12

上記のコメントから、これはしばらくの間計画されているようです(私が気付いpandasた興味深いrosettaプロジェクトもあります)。

ただし、すべての並列機能が に組み込まれるまでは、 + OpenMPと C++を直接使用しpandasて、効率的でメモリをコピーしない並列拡張を非常に簡単に作成できることに気付きました。pandascython

これは、並列の groupby-sum を記述する短い例であり、その使用法は次のようになります。

import pandas as pd
import para_group_demo

df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)

出力は次のとおりです。

     sum
key     
0      6
1      11
2      4

注:この単純な例の機能は、最終的に の一部になることは間違いありませんpandas。ただし、しばらくの間、C++ で並列化する方が自然な場合もあります。これを .NET に組み合わせるのがいかに簡単であるかを認識することが重要pandasです。


これを行うために、コードが続く単純な単一ソース ファイル拡張機能を作成しました。

いくつかのインポートと型定義から始まります

from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map

cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange

import pandas as pd

ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t

C++unordered_map型はシングル スレッドでvector合計するためのもので、 はすべてのスレッドで合計するためのものです。

関数に移りますsum。高速アクセスのために、型付きメモリ ビューから始めます。

def sum(crit, vals):
    cdef int64_t[:] crit_view = crit.values
    cdef int64_t[:] vals_view = vals.values

この関数は、スレッドをほぼ均等に分割し (ここでは 4 にハードコードされています)、各スレッドにその範囲内のエントリを合計させることによって続行します。

    cdef uint64_t num_threads = 4
    cdef uint64_t l = len(crit)
    cdef uint64_t s = l / num_threads + 1
    cdef uint64_t i, j, e
    cdef counts_vec_t counts
    counts = counts_vec_t(num_threads)
    counts.resize(num_threads)
    with cython.boundscheck(False):
        for i in prange(num_threads, nogil=True): 
            j = i * s
            e = j + s
            if e > l:
                e = l
            while j < e:
                counts[i][crit_view[j]] += vals_view[j]
                inc(j)

スレッドが完了すると、関数は (異なる範囲からの) すべての結果を単一の にマージしますunordered_map

    cdef counts_t total
    cdef counts_it_t it, e_it
    for i in range(num_threads):
        it = counts[i].begin()
        e_it = counts[i].end()
        while it != e_it:
            total[deref(it).first] += deref(it).second
            inc(it)        

あとは、 を作成しDataFrameて結果を返すだけです。

    key, sum_ = [], []
    it = total.begin()
    e_it = total.end()
    while it != e_it:
        key.append(deref(it).first)
        sum_.append(deref(it).second)
        inc(it)

    df = pd.DataFrame({'key': key, 'sum': sum_})
    df.set_index('key', inplace=True)
    return df
于 2015-05-21T10:18:25.933 に答える