私はいくつかの Python コードに取り組んでおり、大量のデータに対して実行する必要があるため、マルチプロセッシング モジュールを使用することにしました。ただし、プロセスの数を変えると、明らかに望ましくない異なる結果が得られることがわかりました。
私のコードにはある程度のランダム性がありますが、中央プロセスでプログラムが実行されるとすぐにランダム シードを修正します (ランダム シードは、ラベル付けされたデータポイントとラベル付けされていないデータポイントを選択するために使用されます)。サブプロセスにはランダム性がなく、単純に行列またはベクトル演算を計算します。
関連するコードは次のとおりです。関連する numpy および scipy モジュールを使用します。残りのコードはセットアップを行うだけで、当面の問題には関係ありません。
関数update_pq_wrapper
はサブプロセスを生成します。関数update_p
およびupdate_q
は、データのサブセットに対して呼び出される関数です。 update_pq_wrapper
サブプロセスから結果を収集し、それらをまとめます。W
アフィニティ マトリックスを表すグローバル変数です。
def update_p(startIdx, endIdx, betaMatrix, out_q):
p = np.zeros((endIdx-startIdx, max(LABELS)+1))
W_colsum = W.sum(1)
for i in xrange(startIdx, endIdx): #for i 0 to numExamples - 1
dist = np.zeros(max(LABELS)+1, dtype=np.float64)
g = gamma(W_colsum, i)
dist = np.exp(betaMatrix[i,:] / g)
dist /= np.sum(dist)
p[i-startIdx,:] = dist
out_q.put(p)
def update_q(startIdx, endIdx, sumMatrix, out_q):
q = np.zeros((endIdx-startIdx, max(LABELS)+1))
W_rowsum = W.sum(0)
for i in xrange(startIdx, endIdx):
dist = np.zeros(max(LABELS)+1, dtype=np.float64)
labeled = 1 if L[i] else 0
dist = (R[i,:]*labeled + mu*sumMatrix[i,:]) / (labeled + mu*W_rowsum[0,i])
dist /= np.sum(dist)
q[i-startIdx,:] = dist
out_q.put(q)
def update_pq_wrapper(curIter, chunksize, mat, update_step='p'):
startIdx = xrange(0, dataset_size, chunksize)
prevIter = 1 - curIter
procs = []
out_q = Queue()
for i in range(ncores): #now, start each of the child processes that assembles the matrices individually
start = startIdx[i]
end = dataset_size if i == ncores-1 else startIdx[i+1]
if update_step == 'p':
proc = Process(target=update_p, args=(start, end, mat, out_q))
else:
proc = Process(target=update_q, args=(start, end, mat, out_q))
procs.append(proc)
proc.start()
if update_step == 'p': #once completed, collect results
distMat = P1 if curIter else P0
else:
distMat = Q1 if curIter else Q0
for i in range(ncores):
p_chunk = out_q.get()
start = startIdx[i]
end = dataset_size if i == ncores-1 else startIdx[i+1]
distMat[start:end,:] = p_chunk
for proc in procs:
proc.join()
これを 1 つのプロセスだけで実行すると、次の結果が得られます (5 回の反復の精度): 2.16 --> 26.56 --> 27.37 --> 27.63 --> 27.83
ただし、これを 2 つのプロセスで実行すると、次のようになります: 2.16 --> 3.72 --> 18.74 --> 14.81 --> 16.51
4 つのプロセス: 2.16 --> 13.78 --> 13.85 --> 15.67 --> 13.12
特に上に貼り付けたコードを考えると、この動作の理由がわかりません。
アヴニーシュ
編集 (2013 年 1 月 15 日、太平洋時間午後 3 時 34 分)
何人かのリクエストに応じて、以下のコード全体をコピーして貼り付けます。また、コードが何をするのかを簡単に説明します。
基本的な考え方は、アフィニティ マトリックスで表されるグラフがあるということW
です。各ノードは、各ノードの可能なラベルのセットに対する確率分布を表します。したがって、ラベル付きの例では、ノードは縮退確率分布を持ち、ラベルに対応する行の値は 1 で、それ以外は 0 です。ラベル付けされていないノードの場合、各ノードの結果のラベルは分布であり、この分布の MAP ポイント推定を取得して、そのポイントのラベルを取得できます。アプローチの詳細については、こちらを参照してください。目的関数は、交互最小化として知られる手法を使用して解決されます。この手法では、2 つの確率分布が提案され (p
およびq
コード内で)、分布が同じ値に収束するまで繰り返します。
コメンターの 1 人が示唆したproc.join()
ように、参加後に発生する操作の上になるようにパーツを移動しました。これにより、サブプロセスが生成された段階を超えてコードが進行しないようで、キーボードから実行を中断する必要があります。おそらく、私は何か間違ったことをしているだけです。
#!/usr/bin/python
import sys, commands, string, cPickle
import numpy as np
import scipy.sparse as sp
import scipy.stats as stats
import scipy.linalg as la
from math import ceil
from time import clock
from multiprocessing import Process, Queue
from Queue import Empty
np.random.seed(42)
if not len(sys.argv) == 9:
print 'ERROR: Usage: python alternating_minimization.py <binary data or sim matrix> <labels_file> <iterations> <num cores> <label percent> <v> <mu> <alpha>'
sys.exit()
########################################
# Main Parameters
########################################
similarity_file = sys.argv[1] #output of simgraph_construction.py
labels_file = sys.argv[2]
niterations = int(sys.argv[3])
ncores = int(sys.argv[4])
########################################
# meta parameters
########################################
label_percent = float(sys.argv[5])
v = float(sys.argv[6])
mu = float(sys.argv[7])
alpha = float(sys.argv[8])
########################################
# load the data file (output of simgraph_construction.py) which is already in numpy format
########################################
W = cPickle.load(open(similarity_file, 'r'))
#print some summary statistics about the similarity matrix file
print "Number of examples: %d"%(W.shape[0])
print "Sim Matrix: nnz = %d, density = %.2f percent, average # of neighbors per example: %.2f"%(W.nnz, 100*(float(W.nnz)/(W.shape[0]**2)), float(W.nnz)/W.shape[0])
########################################
# load the labels
########################################
def convertLabels(labels):
unique_labels = np.unique(labels)
label_dict = {}
idx = 0
for label in unique_labels:
label_dict[label] = idx
idx += 1
return label_dict
LABELS = np.load(labels_file)
print "Number of unique labels: %d"%(np.unique(LABELS).shape)
label_dict = convertLabels(LABELS)
NEW_LABELS = np.array([label_dict[label] for label in LABELS])
dataset_size = LABELS.shape[0]
LABELS = NEW_LABELS
W = W + alpha*sp.identity(dataset_size)
########################################
# define the labeled and unlabeled idxs
########################################
def make_test_set():
idx = np.random.rand(dataset_size)
l = (idx < label_percent)
u = (idx >= label_percent)
return l,u
L,U = make_test_set()
def createRDistribution(label_bool, labels):
rows = np.array(range(0, dataset_size), dtype=int)
label_idx = np.where(~label_bool)
rows = np.delete(rows, label_idx)
cols = np.delete(labels, label_idx)
vals = np.ones((rows.shape[0],1)).ravel()
sparseR = sp.csc_matrix((vals, (rows, cols)), shape=(dataset_size, max(labels)+1))
return sparseR
########################################
# make the distributions for the data
########################################
R = createRDistribution(L, LABELS) #labeled distribution is sparse
classNoLabel = np.where(R.sum(0) == 0)
#print classNoLabel #need to figure out how many classes are unrepresented in the labeld set
Q0 = np.zeros((dataset_size, max(LABELS)+1), dtype=np.double)
Q0 += 1.0 / Q0.shape[1]
Q1 = np.zeros((dataset_size, max(LABELS)+1), dtype=np.double)
P0 = np.zeros((dataset_size, max(LABELS)+1), dtype=np.double)
P1 = np.zeros((dataset_size, max(LABELS)+1), dtype=np.double)
def gamma(W_sum,i): #W_sum is sum across all columns of sim matrix W
return v + mu * W_sum[i]
def update_p(startIdx, endIdx, betaMatrix, out_q):
p = np.zeros((endIdx-startIdx, max(LABELS)+1))
W_colsum = W.sum(1)
for i in xrange(startIdx, endIdx): #for i 0 to numExamples - 1
dist = np.zeros(max(LABELS)+1, dtype=np.float64)
g = gamma(W_colsum, i)
dist = np.exp(betaMatrix[i,:] / g)
dist /= np.sum(dist)
p[i-startIdx,:] = dist
out_q.put(p)
def update_q(startIdx, endIdx, sumMatrix, out_q):
q = np.zeros((endIdx-startIdx, max(LABELS)+1))
W_rowsum = W.sum(0)
for i in xrange(startIdx, endIdx):
dist = np.zeros(max(LABELS)+1, dtype=np.float64)
labeled = 1 if L[i] else 0
dist = (R[i,:]*labeled + mu*sumMatrix[i,:]) / (labeled + mu*W_rowsum[0,i])
dist /= np.sum(dist)
q[i-startIdx,:] = dist
out_q.put(q)
def update_pq_wrapper(curIter, chunksize, mat, update_step='p'):
startIdx = xrange(0, dataset_size, chunksize)
prevIter = 1 - curIter
procs = []
out_q = Queue()
for i in range(ncores): #now, start each of the child processes that assembles the matrices individually
start = startIdx[i]
end = dataset_size if i == ncores-1 else startIdx[i+1]
if update_step == 'p':
proc = Process(target=update_p, args=(start, end, mat, out_q))
else:
proc = Process(target=update_q, args=(start, end, mat, out_q))
procs.append(proc)
proc.start()
if update_step == 'p': #once completed, collect results
distMat = P1 if curIter else P0
else:
distMat = Q1 if curIter else Q0
for i in range(ncores):
p_chunk = out_q.get()
start = startIdx[i]
end = dataset_size if i == ncores-1 else startIdx[i+1]
distMat[start:end,:] = p_chunk
for proc in procs:
proc.join()
def compute_tvdist(P,Q):
tv_dist = 0
for i in range(0, dataset_size):
tv_dist += max(np.absolute(P[i,:] - Q[i,:]))
return tv_dist/dataset_size
def main(argv):
accuracyArr = []
tvdistArr = []
print >> sys.stderr, 'Starting %d iterations...' % niterations
chunksize = int(ceil(dataset_size/float(ncores)))
for n in xrange(1,niterations+1):
print >> sys.stderr, 'Iteration %d' % n
idx = n % 2
q_prev = Q1 if not idx else Q0
p_cur = P1 if idx else P0
#print q_prev
start_time = clock()
mat = -v + mu*(W*(np.log(q_prev)-1))
end_time = clock()
#print mat
print "Time taken to compute Beta Matrix: %.2f seconds"%(end_time-start_time)
start_time=clock()
update_pq_wrapper(idx, chunksize, mat, 'p')
end_time=clock()
print "Time taken to update P matrix: %.2f seconds"%(end_time-start_time)
if not n == niterations:
start_time = clock()
mat = W.T*p_cur
end_time = clock()
print "Time taken to compute Sum Matrix: %.2f seconds"%(end_time-start_time)
start_time = clock()
update_pq_wrapper(idx, chunksize, mat, 'q')
end_time = clock()
print "Time taken to update Q matrix: %.2f seconds"%(end_time-start_time)
## Evaluation ##
evalMat = P1 if idx else P0
predLabel = np.argmax(evalMat, axis=1) #gives the index (column)
accuracy = float(np.sum(predLabel[np.where(U)] == LABELS[np.where(U)]) )/ LABELS[np.where(U)].shape[0]
print "Accuracy: %.2f"%(accuracy*100)
accuracyArr.append(accuracy)
totalVar = []
if n != niterations:
tv_dist = compute_tvdist(P1, Q1) if idx else compute_tvdist(P0, Q0)
else:
tv_dist = compute_tvdist(P1, Q0) if idx else compute_tvdist(P0, Q1)
print "Average Total Variation Distance is %.3f"%(tv_dist)
tvdistArr.append(tv_dist)
print "Summary of final probability density matrix: "
print evalMat
print '\t'.join([str(round(acc,4)) for acc in accuracyArr])