scipy.sparse.csr_matrix 形式の大きな疎行列 X があり、並列処理を利用してこれに numpy 配列 W を掛けたいと思います。いくつかの調査の後、プロセス間で X と W をコピーしないようにするために、マルチプロセッシングで配列を使用する必要があることを発見しました (たとえば、ここから: Python マルチプロセッシングで Pool.map を配列 (共有メモリ) と組み合わせる方法?およびIs shared readonly data copy to Python マルチプロセッシングの異なるプロセス? )。これが私の最新の試みです
import multiprocessing
import numpy
import scipy.sparse
import time
def initProcess(data, indices, indptr, shape, Warr, Wshp):
global XData
global XIndices
global XIntptr
global Xshape
XData = data
XIndices = indices
XIntptr = indptr
Xshape = shape
global WArray
global WShape
WArray = Warr
WShape = Wshp
def dot2(args):
rowInds, i = args
global XData
global XIndices
global XIntptr
global Xshape
data = numpy.frombuffer(XData, dtype=numpy.float)
indices = numpy.frombuffer(XIndices, dtype=numpy.int32)
indptr = numpy.frombuffer(XIntptr, dtype=numpy.int32)
Xr = scipy.sparse.csr_matrix((data, indices, indptr), shape=Xshape)
global WArray
global WShape
W = numpy.frombuffer(WArray, dtype=numpy.float).reshape(WShape)
return Xr[rowInds[i]:rowInds[i+1], :].dot(W)
def getMatmat(X):
numJobs = multiprocessing.cpu_count()
rowInds = numpy.array(numpy.linspace(0, X.shape[0], numJobs+1), numpy.int)
#Store the data in X as RawArray objects so we can share it amoung processes
XData = multiprocessing.RawArray("d", X.data)
XIndices = multiprocessing.RawArray("i", X.indices)
XIndptr = multiprocessing.RawArray("i", X.indptr)
def matmat(W):
WArray = multiprocessing.RawArray("d", W.flatten())
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(), initializer=initProcess, initargs=(XData, XIndices, XIndptr, X.shape, WArray, W.shape))
params = []
for i in range(numJobs):
params.append((rowInds, i))
iterator = pool.map(dot2, params)
P = numpy.zeros((X.shape[0], W.shape[1]))
for i in range(numJobs):
P[rowInds[i]:rowInds[i+1], :] = iterator[i]
return P
return matmat
if __name__ == '__main__':
#Create a random sparse matrix X and a random dense one W
X = scipy.sparse.rand(10000, 8000, 0.1)
X = X.tocsr()
W = numpy.random.rand(8000, 20)
startTime = time.time()
A = getMatmat(X)(W)
parallelTime = time.time()-startTime
startTime = time.time()
B = X.dot(W)
nonParallelTime = time.time()-startTime
print(parallelTime, nonParallelTime)
ただし、出力は次のようになります: (4.431, 0.165) は、並列バージョンが非並列乗算よりもはるかに遅いことを示しています。
大きなデータをプロセスにコピーしているときに、同様の状況でスローダウンが発生する可能性があると思いますが、共有変数を格納するために Array を使用しているため、ここではそうではありません (numpy.frombuffer または csr_matrix の作成時に発生しない限り、しかし、csr_matrix を直接共有する方法が見つかりませんでした)。遅い速度のもう1つの考えられる原因は、各プロセスの各行列乗算の大きな結果を返すことですが、これを回避する方法はわかりません。
誰かが私が間違っているところを見ることができますか? 助けてくれてありがとう!
更新: 確信は持てませんが、プロセス間で大量のデータを共有することはそれほど効率的ではないと思います。理想的には、マルチスレッドを使用する必要があります (ただし、Global Interpreter Lock (GIL) により非常に困難になります)。これを回避する 1 つの方法は、たとえば Cython を使用して GIL をリリースすることです ( http://docs.cython.org/src/userguide/parallelism.htmlを参照)。ただし、多くの numpy 関数は GIL を通過する必要があります。