5

アップデート1.0開始

電話すると思われる

for i, Wi in enumerate(W.T):
    idx.append(i)
    result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,)))

関数に渡された引数ALS_Y/ALS_Xは参照ではなく、引数をコピーしました..したがって、XまたはYが非常にlarge matrixes、私の場合、それは6000*40またはそうです(そして、それは afor-loopであり、反復回数が であると仮定しましょう50 000...) 、メモリの制限を超えています。
そして、インデックスをパラメーターとして関数に渡すだけで、グローバル引数を使用してみました。

import multiprocessing
import time
import numpy as np

def func(idx):
    global a
    a[idx] += 1



if __name__ == "__main__":
    a=range(10)
    for j in xrange(2):
        pool = multiprocessing.Pool(processes=8)
        result = []
        for i in xrange(10):
            result.append(pool.apply_async(func, (i, )))
        pool.close()
        pool.join()
        print a
        print "Sub-process(es) done."

それは出力します: `

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Sub-process(es) done.
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Sub-process(es) done.

So, this means it still copiedあ`! さて、この問題を処理する方法はあるのでしょうか? 感謝!

アップデート1.0終了


以下は、行列因数分解の問題を解決するための Python のコードです。W = XY。ただし、以下のコードは効率が悪いので、並列版に変換できるといいのですが、GPU を使用するのが最適で、CPU も問題ありません。並列プログラミングの経験がないので、誰かアドバイスをもらえますか?

以下は、ALS を使用して行列を因数分解するコードです (交互最小二乗、詳細はこちら) 。

for ii in range(n_iterations):
    for u, Wu in enumerate(W):
        X[u] = np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors),
                               np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T #X_inner loop

    for i, Wi in enumerate(W.T):
        Y[:,i] = np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors), #Y_inner loop
                                 np.dot(X.T, np.dot(np.diag(Wi), Q[:, i])))#Y_inner loop
    error = get_error(Q, X, Y, W)
    weighted_errors.append(error)
    print '{}th iteration is completed'.format(ii)

マルチプロセッシング ライブラリを使用した後、私のコードは次のようになります。

def ALS_X(Y, Wu, Q, lambda_, n_factors, u):
return np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors),
                           np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T  

for ii in range(n_iterations):
pool = multiprocessing.Pool(processes=12)#create pool
result = []#store each row for X
idx = []#store the row number
for u, Wu in enumerate(W):
    idx.append(u)
    result.append(pool.apply_async(ALS_X, (Y, Wu, Q, lambda_, n_factors, u,)))
pool.close()
pool.join()
for u, vector in zip(idx, result):
    X[u] = vector.get()#assign the result to X
######################################
pool = multiprocessing.Pool(processes=12)#for Y, much similar to X
result = []
idx = []
for i, Wi in enumerate(W.T):
    idx.append(i)
    result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,)))
pool.close()
pool.join()
for i, vector in zip(idx, result):
    Y[:,i]  = vector.get()
error = get_error(Q, X, Y, W)
weighted_errors.append(error)
print '{}th iteration is completed'.format(ii), 'error: ',error

しかし、やや悲惨なことに、プログラムは常に静かにクラッシュしました...

以下は私のコードの全体です..それはすべて乱雑です. ここでは行列をランダムに生成するため、load_data get_errorandを無視してください..vec2str

import pandas as pd
import numpy as np
import multiprocessing

def vec2str(vec):
    res = ''
    for dim in len(vec):
        res += str(vec[dim]) + ','
    return res

def load_data(heads, filename, sep,header=None):
    data = pd.read_table(filename, sep=sep, header=header, names=heads)
    rp = data.pivot_table(columns=['sid'],index=['uid'],values=['rating'])#not generally...
    Q = rp.fillna(0)
    Q = Q.values
    W = Q >0.5
    W[W == True] = 1
    W[W == False] = 0
    W = W.astype(np.float64, copy=False)
    return Q, W, rp

def get_error(Q, X, Y, W):
    return np.sum((W * (Q - np.dot(X, Y)))**2)

'''
X[u] = np.linalg.solve(np.dot(, np.dot(np.diag(), .T)) +  * np.eye(),
                               np.dot(, np.dot(np.diag(), Q[u].T))).T
'''
def ALS_X(Y, Wu, Q, lambda_, n_factors, u):
    return np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) + lambda_ * np.eye(n_factors),
                               np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T

'''
Y[:,i] = np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors),
                                 np.dot(X.T, np.dot(np.diag(Wi), Q[:, i])))
'''

def ALS_Y(X, Wi, Q, lambda_, n_factors, i):
    return np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors),
                                 np.dot(X.T, np.dot(np.diag(Wi), Q[:, i])))



if __name__ == "__main__":
    
    lambda_ = 0.1
    n_factors = 40
    filename = 'data_songID'
    n_iterations = 20
    #Q, W, rp = load_data(['uid', 'sid', 'rating'], filename, ',')
    Q = np.random.rand(1000,1000)
    m, n = Q.shape
    W = np.eye(1000)
    print 'Loading data finished, ', 'size: ', Q.shape
    print 'Settings ', 'lambda = {}'.format(lambda_), 'n_factors = {}'.format(n_factors)
    X = 5 * np.random.rand(m, n_factors) 
    Y = 5 * np.random.rand(n_factors, n)
    errors = []
    for ii in range(n_iterations):
        X = np.linalg.solve(np.dot(Y, Y.T) + lambda_ * np.eye(n_factors), 
                        np.dot(Y, Q.T)).T
        Y = np.linalg.solve(np.dot(X.T, X) + lambda_ * np.eye(n_factors),
                        np.dot(X.T, Q))
        if ii % 100 == 0:
            print('{}th iteration is completed'.format(ii))
        errors.append(get_error(Q, X, Y, W))
        Q_hat = np.dot(X, Y)
        print('Error of rated movies: {}'.format(get_error(Q, X, Y, W)))
    print errors
    #####ALS start....#####
    print '*'*100
    weighted_errors = []
    for ii in range(n_iterations):
        pool = multiprocessing.Pool(processes=12)
        result = []
        idx = []
        for u, Wu in enumerate(W):
            idx.append(u)
            result.append(pool.apply_async(ALS_X, (Y, Wu, Q, lambda_, n_factors, u,)))
        pool.close()
        pool.join()
        for u, vector in zip(idx, result):
            X[u] = vector.get()
        ######################################
        pool = multiprocessing.Pool(processes=12)
        result = []
        idx = []
        for i, Wi in enumerate(W.T):
            idx.append(i)
            result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,)))
        pool.close()
        pool.join()
        for i, vector in zip(idx, result):
            Y[:,i]  = vector.get()
        error = get_error(Q, X, Y, W)
        weighted_errors.append(error)
        print '{}th iteration is completed'.format(ii), 'error: ',error

    weighted_Q_hat = np.dot(X,Y)
    print weighted_errors
    X.tofile('X.bin')
    Y.tofile('Y.bin')
    latent_user_file = open('user_latent','w')
    for idx in len(rp.axes[0]):
        latent_user_file.write(str(rp.axes[0][idx]) + '\t' + vec2str(X[idx,:]) + '\n')

    latent_mid_file = open('mid_latent', 'w')
    for idx in len(rp.axes[1]):
        latent_mid_file.write(str(rp.axes[1][idx]) + '\t' + vec2str(Y.T[idx,:]) + '\n')
4

0 に答える 0