7

Python環境のPandasモジュールを使って「ビッグデータ」のメモリベースリアルタイム計算モジュールを作っています。

したがって、応答時間はこのモジュールの品質であり、非常に重要かつ重要です。

大規模なデータ セットを処理するために、データを分割し、サブ分割データを並行して処理します。

サブデータの結果を格納する部分に時間がかかります(21行目)。

内部でメモリディープコピーが発生するか、渡されたサブデータがメモリ上で共有されていないと思います。

モジュールを C または C++ で記述した場合、以下のようにポインターまたは参照を使用します。

「プロセス=プロセス(ターゲット=addNewDerivedColumn、引数=[結果リスト、&sub_dataframe ])」

また

"プロセス = プロセス (ターゲット = addNewDerivedColumn、args = [resultList、sub_dataframe])

def addNewDerivedColumn(resultList, split_sub_dataframe& ):.... "

メモリのディープ コピーを回避したり、マルチプロセッシングに費やす時間を短縮したりする良い方法はありますか? 「エレガントではない」は問題ありません。コードをダーティにする準備ができました。weekref、RawValue、RawArray、Value、Pool を試しましたが、すべて失敗しました。

このモジュールは MacOS で開発されており、最終的には Linux または Unix で実行される予定です。

Windows OS は考慮しないでください。

これがコードです。

実際のコードは私のオフィスにありますが、構造とロジックは実際のものと同じです。

1 #-*- coding: UTF-8 -*-' 
2 import pandas as pd
3 import numpy as np
4 from multiprocessing import *
5 import time
6
7
8 def addNewDerivedColumn(resultList, split_sub_dataframe):
9    
10    split_sub_dataframe['new_column']=    np.abs(split_sub_dataframe['column_01']+split_sub_dataframe['column_01']) / 2
11    
12    print split_sub_dataframe.head()
13    
14    '''
15     i think that the hole result of sub-dataframe is copied to resultList, not reference value 
16     and in here time spend much
17     compare elapsed time of comment 21th line with the uncommented one
18     In MS Windows, signifiant difference of elapsed time doesn't show up
19     In Linux or Mac OS, the difference is big
20    '''
21    resultList.append(split_sub_dataframe)
22    
23
24
25 if __name__ == "__main__":
26    
27    # example data generation
28    # the record count of the real data is over 1 billion with about 10 columns.
29    dataframe = pd.DataFrame(np.random.randn(100000000, 4), columns=['column_01', 'column_02', 'column_03', 'column_04'])
30    
31
32    print 'start...'
33    start_time = time.time()
34    
35    # to launch 5 process in parallel, I split the dataframe to five sub-dataframes
36    split_dataframe_list = np.array_split(dataframe, 5)
37    
38    # multiprocessing 
39    manager = Manager()
40    
41    # result list
42    resultList=manager.list()
43    processList=[]
44    
45    for sub_dataframe in split_dataframe_list:
46        process=Process(target=addNewDerivedColumn, args=[resultList, sub_dataframe])
47        processList.append(process)
48        
49    for proc in processList: 
50        proc.start()
51    for proc in processList: 
52        proc.join()
53    
54    
55    print 'elapsed time  : ', np.round(time.time() - start_time,3)
4

2 に答える 2

9

プロセス間通信を最小限に抑えると、パフォーマンスが向上します。したがって、sub-DataFrame を引数として渡す代わりに、インデックス値を渡すだけです。サブプロセスは、共通の DataFrame 自体をスライスできます。

サブプロセスが生成されると、親プロセスの呼び出しモジュールで定義されたすべてのグローバルのコピーを取得します。したがって、マルチプロセッシング プールを生成する前にdf大規模な DataFrameがグローバルで定義されている場合、生成された各サブプロセスは にアクセスできます。df

がない Windows ではfork()、新しい python プロセスが開始され、呼び出しモジュールがインポートされます。したがって、Windows では、生成されたサブプロセスをdf最初から再生成する必要があり、これには時間と多くの追加メモリがかかる可能性があります。

ただし、Linux ではコピー オン ライトがあります。これは、生成されたサブプロセスが、元のグローバル (呼び出しモジュールの)をコピーせずにアクセスすることを意味します。サブプロセスがグローバルを変更しようとした場合にのみ、Linux は値が変更される前に別のコピーを作成します。

したがって、サブプロセスでグローバルを変更しないようにすると、パフォーマンスが向上します。サブプロセスは計算のみに使用することをお勧めします。計算の値を返し、メイン プロセスで結果を照合して元の DataFrame を変更します。

import pandas as pd
import numpy as np
import multiprocessing as mp
import time

def compute(start, end):
    sub = df.iloc[start:end]
    return start, end, np.abs(sub['column_01']+sub['column_01']) / 2

def collate(retval):
    start, end, arr = retval
    df.ix[start:end, 'new_column'] = arr

def window(seq, n=2):
    """
    Returns a sliding window (of width n) over data from the sequence
    s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...
    """
    for i in range(len(seq)-n+1):
        yield tuple(seq[i:i+n])

if __name__ == "__main__":
    result = []
    # the record count of the real data is over 1 billion with about 10 columns.
    N = 10**3
    df = pd.DataFrame(np.random.randn(N, 4),
                      columns=['column_01', 'column_02', 'column_03', 'column_04'])

    pool = mp.Pool()    
    df['new_column'] = np.empty(N, dtype='float')

    start_time = time.time()
    idx = np.linspace(0, N, 5+1).astype('int')
    for start, end in window(idx, 2):
        # print(start, end)
        pool.apply_async(compute, args=[start, end], callback=collate)

    pool.close()
    pool.join()
    print 'elapsed time  : ', np.round(time.time() - start_time,3)
    print(df.head())
于 2013-11-10T07:43:58.683 に答える
1

この質問と @unutbu の回答に触発されて、githubで並列バージョンのマップを作成しました。この関数は、複数のコアを持つ単一のマシンで読み取り専用の大きなデータ構造を無限に並列処理するのに適しています。基本的な考え方は @unutbu が推奨するものと似ており、一時的なグローバル変数を使用して大きなデータ構造 (データ フレームなど) を保持し、変数自体ではなくその「名前」をワーカーに渡します。しかし、これはすべて map 関数にカプセル化されているため、pathos パッケージの助けを借りて、標準の map 関数をほぼ完全に置き換えることができます。使用例は次のとおりです。

# Suppose we process a big dataframe with millions of rows.
size = 10**9
df = pd.DataFrame(np.random.randn(size, 4),
                  columns=['column_01', 'column_02', 
                           'column_03', 'column_04'])
# divide df into sections of 10000 rows; each section will be
# processed by one worker at a time
section_size = 10000
sections = [xrange(start, start+section_size) 
            for start in xrange(0, size, section_size)]

# The worker function that processes one section of the
# df. The key assumption is that a child 
# process does NOT modify the dataframe, but do some 
# analysis or aggregation and return some result.
def func(section, df):
    return some_processing(df.iloc[section])

num_cores = 4
# sections (local_args) specify which parts of a big object to be processed;
# global_arg holds the big object to be processed to avoid unnecessary copy;
# results are a list of objects each of which is the processing results 
# of one part of a big object (i.e., one element in the iterable sections) 
# in order.
results = map(func, sections, global_arg=df,
              chunksize=10, 
              processes=num_cores)

# reduce results (assume it is a list of data frames)
result = pd.concat(results)

私のテキスト マイニング タスクのいくつかでは、df をワーカー関数に直接渡す単純な並列実装は、大きなデータ フレームの高価なコピー操作のために、シングル スレッド バージョンよりもさらに遅くなります。ただし、上記の実装では、4 コアのタスクで 3 倍以上のスピードアップが得られます。これは、実際の軽量マルチスレッドにかなり近いようです。

于 2014-12-29T03:28:09.460 に答える