皆様からのご回答ありがとうございました。
私は提案された問題の解決策(「その」解決策ではない)を作成しました。他の人がそれを役立つと思うかもしれないので、ここにコードを投稿します。私の解決策は、AdamMatanによって提案されたオプション1と3のハイブリッドです。コードには、viセッションの行番号が含まれています。これは、以下の説明に役立ちます。
12 # System libraries needed by this module.
13 import numpy, multiprocessing, time
14
15 # Third-party libraries needed by this module.
16 import labeledMatrix
17
18 # ----- Begin code for this module. -----
19 from commonFunctions import debugMessage
20
21 def createSimilarityMatrix( fvFileHandle, fvFileParser, fvSimScorer, colIDs, rowIDs=None,
22 exceptionType=ValueError, useNumType=numpy.float, verbose=False,
23 maxProcesses=None, processCheckTime=1.0 ):
24 """Create a labeled similarity matrix from vectorial data in [fvFileHandle] that can be
25 parsed by [fvFileParser].
26 [fvSimScorer] should be a function that can return a floating point value for a pair of vectors.
27
28 If the matrix [rowIDs] are not specified, they will be the same as the [colIDs].
29
30 [exceptionType] will be raised when a row or column ID cannot be found in the vectorial data.
31 [maxProcesses] specifies the number of CPUs to use for calculation; default value is all available CPUs.
32 [processCheckTime] is the interval for checking activity of CPUs (if completed calculation or not).
33
34 Return: a LabeledNumericMatrix with corresponding row and column IDs."""
35
36 # Setup row/col ID information.
37 useColIDs = list( colIDs )
38 useRowIDs = rowIDs or useColIDs
39 featureData = fvFileParser( fvFileHandle, retainIDs=(useColIDs+useRowIDs) )
40 verbose and debugMessage( "Retrieved %i feature vectors from FV file." % len(featureData) )
41 featureIDs = featureData.keys()
42 absentIDs = [ ID for ID in set(useColIDs + useRowIDs) if ID not in featureIDs ]
43 if absentIDs:
44 raise exceptionType, "IDs %s not found in feature vector file." % absentIDs
45 # Otherwise, proceed to creation of matrix.
46 resultMatrix = labeledMatrix.LabeledNumericMatrix( useRowIDs, useColIDs, numType=useNumType )
47 calculateSymmetric = True if set( useRowIDs ) == set( useColIDs ) else False
48
49 # Setup data structures needed for parallelization.
50 numSubprocesses = multiprocessing.cpu_count() if maxProcesses==None else int(maxProcesses)
51 assert numSubprocesses >= 1, "Specification of %i CPUs to calculate similarity matrix." % numSubprocesses
52 dataManager = multiprocessing.Manager()
53 sharedFeatureData = dataManager.dict( featureData )
54 resultQueue = multiprocessing.Queue()
55 # Assign jobs evenly through number of processors available.
56 jobList = [ list() for i in range(numSubprocesses) ]
57 calculationNumber = 0 # Will hold total number of results stored.
58 if calculateSymmetric: # Perform calculations with n(n+1)/2 pairs, instead of n^2 pairs.
59 remainingIDs = list( useRowIDs )
60 while remainingIDs:
61 firstID = remainingIDs[0]
62 for secondID in remainingIDs:
63 jobList[ calculationNumber % numSubprocesses ].append( (firstID, secondID) )
64 calculationNumber += 1
65 remainingIDs.remove( firstID )
66 else: # Straight processing one at a time.
67 for rowID in useRowIDs:
68 for colID in useColIDs:
69 jobList[ calculationNumber % numSubprocesses ].append( (rowID, colID) )
70 calculationNumber += 1
71
72 verbose and debugMessage( "Completed setup of job distribution: %s." % [len(js) for js in jobList] )
73 # Define a function to perform calculation and store results
74 def runJobs( scoreFunc, pairs, featureData, resultQueue ):
75 for pair in pairs:
76 score = scoreFunc( featureData[pair[0]], featureData[pair[1]] )
77 resultQueue.put( ( pair, score ) )
78 verbose and debugMessage( "%s: completed all calculations." % multiprocessing.current_process().name )
79
80
81 # Create processes to perform parallelized computing.
82 processes = list()
83 for num in range(numSubprocesses):
84 processes.append( multiprocessing.Process( target=runJobs,
85 args=( fvSimScorer, jobList[num], sharedFeatureData, resultQueue ) ) )
86 # Launch processes and wait for them to all complete.
87 import Queue # For Queue.Empty exception.
88 for p in processes:
89 p.start()
90 assignmentsCompleted = 0
91 while assignmentsCompleted < calculationNumber:
92 numActive = [ p.is_alive() for p in processes ].count( True )
93 verbose and debugMessage( "%i/%i complete; Active processes: %i" % \
94 ( assignmentsCompleted, calculationNumber, numActive ) )
95 while True: # Empty queue immediately to avoid underlying pipe/socket implementation from hanging.
96 try:
97 pair, score = resultQueue.get( block=False )
98 resultMatrix[ pair[0], pair[1] ] = score
99 assignmentsCompleted += 1
100 if calculateSymmetric:
101 resultMatrix[ pair[1], pair[0] ] = score
102 except Queue.Empty:
103 break
104 if numActive == 0: finished = True
105 else:
106 time.sleep( processCheckTime )
107 # Result queue emptied and no active processes remaining - completed calculations.
108 return resultMatrix
109 ## end of createSimilarityMatrix()
36〜47行目は、元の質問の一部であった問題の定義に関連する単なる予備的なものです。cPythonのGILを回避するためのマルチプロセッシングのセットアップは49〜56行目で、57〜70行目は細分化されたタスクを均等に作成するために使用されます。行/列IDのリストが40,000程度に達すると、製品が大量のメモリを消費することになるため、itertools.productの代わりに57〜70行目のコードが使用されます。
実行される実際の計算は74〜78行目であり、ここではID->vectorエントリの共有ディクショナリと共有結果キューが使用されます。
行81〜85は、実際にはまだ開始されていませんが、実際のProcessオブジェクトをセットアップします。
私の最初の試み(ここには示されていません)では、「try ... resultQueue.get()andassignただし...」コードは実際には外部制御ループの外側にありました(すべての計算が終了したわけではありません)。そのバージョンのコードを9x9マトリックスの単体テストで実行したところ、問題はありませんでした。ただし、200x200以上に移動すると、実行間でコードを変更しなかったにもかかわらず、このコードがハングすることがわかりました。
このディスカッション(http://bugs.python.org/issue8426)とマルチプロセスの公式ドキュメントによると、基盤となる実装のパイプ/ソケットサイズが非常に大きくない場合、multiprocess.Queueの使用がハングする可能性があります。したがって、私のソリューションとしてここに示したコードは、プロセスの完了を確認しながら定期的にキューを空にし(91〜106行目を参照)、子プロセスが新しい結果を入力し続け、パイプがいっぱいになるのを防ぎます。
1000x1000のより大きな行列でコードをテストしたとき、計算コードがキューおよび行列割り当てコードよりもかなり前に終了していることに気付きました。cProfileを使用すると、1つのボトルネックがデフォルトのポーリング間隔processCheckTime = 1.0(23行目)であり、この値を下げると結果の速度が向上することがわかりました(タイミングの例については投稿の下部を参照してください)。これは、Pythonでのマルチプロセッシングに不慣れな他の人々にとって役立つ情報かもしれません。
全体として、これはおそらく可能な限り最良の実装ではありませんが、さらなる最適化の出発点を提供します。よく言われるように、並列化による最適化には適切な分析と思考が必要です。
タイミングの例、すべて8個のCPUを使用。
200x200(20100計算/割り当て)
t = 1.0:実行時間18秒
t = 0.01:実行時間3秒
500x500(125250の計算/割り当て)
t = 1.0:実行時間86秒
t = 0.01:実行時間23秒
誰かがコードをコピーして貼り付けたい場合に備えて、これは私が開発の一部に使用した単体テストです。明らかに、ラベル付けされたマトリックスクラスコードはここにはなく、指紋リーダー/スコアラーコードは含まれていません(ただし、独自のコードを作成するのは非常に簡単です)。もちろん、誰かを助けてくれるなら、そのコードも共有できてうれしいです。
112 def unitTest():
113 import cStringIO, os
114 from fingerprintReader import MismatchKernelReader
115 from fingerprintScorers import FeatureVectorLinearKernel
116 exampleData = cStringIO.StringIO() # 9 examples from GPCR (3,1)-mismatch descriptors, first 10 columns.
117 exampleData.write( ",AAA,AAC,AAD,AAE,AAF,AAG,AAH,AAI,AAK" + os.linesep )
118 exampleData.write( "TS1R2_HUMAN,5,2,3,6,8,6,6,7,4" + os.linesep )
119 exampleData.write( "SSR1_HUMAN,11,6,5,7,4,7,4,7,9" + os.linesep )
120 exampleData.write( "OXYR_HUMAN,27,13,14,14,15,14,11,16,14" + os.linesep )
121 exampleData.write( "ADA1A_HUMAN,7,3,5,4,5,7,3,8,4" + os.linesep )
122 exampleData.write( "TA2R_HUMAN,16,6,7,8,9,10,6,6,6" + os.linesep )
123 exampleData.write( "OXER1_HUMAN,10,6,5,7,11,9,5,10,6" + os.linesep )
124 exampleData.write( "NPY1R_HUMAN,3,3,0,2,3,1,0,6,2" + os.linesep )
125 exampleData.write( "NPSR1_HUMAN,0,1,1,0,3,0,0,6,2" + os.linesep )
126 exampleData.write( "HRH3_HUMAN,16,9,9,13,14,14,9,11,9" + os.linesep )
127 exampleData.write( "HCAR2_HUMAN,3,1,3,2,5,1,1,6,2" )
128 columnIDs = ( "TS1R2_HUMAN", "SSR1_HUMAN", "OXYR_HUMAN", "ADA1A_HUMAN", "TA2R_HUMAN", "OXER1_HUMAN",
129 "NPY1R_HUMAN", "NPSR1_HUMAN", "HRH3_HUMAN", "HCAR2_HUMAN", )
130 m = createSimilarityMatrix( exampleData, MismatchKernelReader, FeatureVectorLinearKernel, columnIDs,
131 verbose=True, )
132 m.SetOutputPrecision( 6 )
133 print m
134
135 ## end of unitTest()