Python環境のPandasモジュールを使って「ビッグデータ」のメモリベースリアルタイム計算モジュールを作っています。
したがって、応答時間はこのモジュールの品質であり、非常に重要かつ重要です。
大規模なデータ セットを処理するために、データを分割し、サブ分割データを並行して処理します。
サブデータの結果を格納する部分に時間がかかります(21行目)。
内部でメモリディープコピーが発生するか、渡されたサブデータがメモリ上で共有されていないと思います。
モジュールを C または C++ で記述した場合、以下のようにポインターまたは参照を使用します。
「プロセス=プロセス(ターゲット=addNewDerivedColumn、引数=[結果リスト、&sub_dataframe ])」
また
"プロセス = プロセス (ターゲット = addNewDerivedColumn、args = [resultList、sub_dataframe])
def addNewDerivedColumn(resultList, split_sub_dataframe& ):.... "
メモリのディープ コピーを回避したり、マルチプロセッシングに費やす時間を短縮したりする良い方法はありますか? 「エレガントではない」は問題ありません。コードをダーティにする準備ができました。weekref、RawValue、RawArray、Value、Pool を試しましたが、すべて失敗しました。
このモジュールは MacOS で開発されており、最終的には Linux または Unix で実行される予定です。
Windows OS は考慮しないでください。
これがコードです。
実際のコードは私のオフィスにありますが、構造とロジックは実際のものと同じです。
1 #-*- coding: UTF-8 -*-'
2 import pandas as pd
3 import numpy as np
4 from multiprocessing import *
5 import time
6
7
8 def addNewDerivedColumn(resultList, split_sub_dataframe):
9
10 split_sub_dataframe['new_column']= np.abs(split_sub_dataframe['column_01']+split_sub_dataframe['column_01']) / 2
11
12 print split_sub_dataframe.head()
13
14 '''
15 i think that the hole result of sub-dataframe is copied to resultList, not reference value
16 and in here time spend much
17 compare elapsed time of comment 21th line with the uncommented one
18 In MS Windows, signifiant difference of elapsed time doesn't show up
19 In Linux or Mac OS, the difference is big
20 '''
21 resultList.append(split_sub_dataframe)
22
23
24
25 if __name__ == "__main__":
26
27 # example data generation
28 # the record count of the real data is over 1 billion with about 10 columns.
29 dataframe = pd.DataFrame(np.random.randn(100000000, 4), columns=['column_01', 'column_02', 'column_03', 'column_04'])
30
31
32 print 'start...'
33 start_time = time.time()
34
35 # to launch 5 process in parallel, I split the dataframe to five sub-dataframes
36 split_dataframe_list = np.array_split(dataframe, 5)
37
38 # multiprocessing
39 manager = Manager()
40
41 # result list
42 resultList=manager.list()
43 processList=[]
44
45 for sub_dataframe in split_dataframe_list:
46 process=Process(target=addNewDerivedColumn, args=[resultList, sub_dataframe])
47 processList.append(process)
48
49 for proc in processList:
50 proc.start()
51 for proc in processList:
52 proc.join()
53
54
55 print 'elapsed time : ', np.round(time.time() - start_time,3)