2

私はいくつかの Python コードに取り組んでおり、大量のデータに対して実行する必要があるため、マルチプロセッシング モジュールを使用することにしました。ただし、プロセスの数を変えると、明らかに望ましくない異なる結果が得られることがわかりました。

私のコードにはある程度のランダム性がありますが、中央プロセスでプログラムが実行されるとすぐにランダム シードを修正します (ランダム シードは、ラベル付けされたデータポイントとラベル付けされていないデータポイントを選択するために使用されます)。サブプロセスにはランダム性がなく、単純に行列またはベクトル演算を計算します。

関連するコードは次のとおりです。関連する numpy および scipy モジュールを使用します。残りのコードはセットアップを行うだけで、当面の問題には関係ありません。

関数update_pq_wrapperはサブプロセスを生成します。関数update_pおよびupdate_qは、データのサブセットに対して呼び出される関数です。 update_pq_wrapperサブプロセスから結果を収集し、それらをまとめます。Wアフィニティ マトリックスを表すグローバル変数です。

def update_p(startIdx, endIdx, betaMatrix, out_q):
  p = np.zeros((endIdx-startIdx, max(LABELS)+1))
  W_colsum = W.sum(1)
  for i in xrange(startIdx, endIdx): #for i 0 to numExamples - 1                                                                                                                                                                            
    dist = np.zeros(max(LABELS)+1, dtype=np.float64)                                                                                                                                                           
    g = gamma(W_colsum, i)
    dist = np.exp(betaMatrix[i,:] / g)
    dist /= np.sum(dist)
    p[i-startIdx,:] = dist
  out_q.put(p)

def update_q(startIdx, endIdx, sumMatrix, out_q):
  q = np.zeros((endIdx-startIdx, max(LABELS)+1))
  W_rowsum = W.sum(0)
  for i in xrange(startIdx, endIdx):
    dist = np.zeros(max(LABELS)+1, dtype=np.float64)
    labeled = 1 if L[i] else 0
    dist = (R[i,:]*labeled + mu*sumMatrix[i,:]) / (labeled + mu*W_rowsum[0,i])
    dist /= np.sum(dist)
    q[i-startIdx,:] = dist
  out_q.put(q)

def update_pq_wrapper(curIter, chunksize, mat, update_step='p'):
  startIdx = xrange(0, dataset_size, chunksize)
  prevIter = 1 - curIter
  procs = []
  out_q = Queue()
  for i in range(ncores):    #now, start each of the child processes that assembles the matrices individually                                                                                                                               
    start = startIdx[i]
    end = dataset_size if i == ncores-1 else startIdx[i+1]
    if update_step == 'p':
        proc = Process(target=update_p, args=(start, end, mat, out_q))
    else:
        proc = Process(target=update_q, args=(start, end, mat, out_q))
    procs.append(proc)
    proc.start()
  if update_step == 'p':    #once completed, collect results                                                                                                                                                                                
    distMat = P1 if curIter else P0
  else:
    distMat = Q1 if curIter else Q0
  for i in range(ncores):
    p_chunk = out_q.get()
    start = startIdx[i]
    end = dataset_size if i == ncores-1 else startIdx[i+1]
    distMat[start:end,:] = p_chunk
  for proc in procs:
    proc.join()

これを 1 つのプロセスだけで実行すると、次の結果が得られます (5 回の反復の精度): 2.16 --> 26.56 --> 27.37 --> 27.63 --> 27.83

ただし、これを 2 つのプロセスで実行すると、次のようになります: 2.16 --> 3.72 --> 18.74 --> 14.81 --> 16.51

4 つのプロセス: 2.16 --> 13.78 --> 13.85 --> 15.67 --> 13.12

特に上に貼り付けたコードを考えると、この動作の理由がわかりません。

アヴニーシュ

編集 (2013 年 1 月 15 日、太平洋時間午後 3 時 34 分)

何人かのリクエストに応じて、以下のコード全体をコピーして貼り付けます。また、コードが何をするのかを簡単に説明します。

基本的な考え方は、アフィニティ マトリックスで表されるグラフがあるということWです。各ノードは、各ノードの可能なラベルのセットに対する確率分布を表します。したがって、ラベル付きの例では、ノードは縮退確率分布を持ち、ラベルに対応する行の値は 1 で、それ以外は 0 です。ラベル付けされていないノードの場合、各ノードの結果のラベルは分布であり、この分布の MAP ポイント推定を取得して、そのポイントのラベルを取得できます。アプローチの詳細については、こちらを参照してください。目的関数は、交互最小化として知られる手法を使用して解決されます。この手法では、2 つの確率分布が提案され (pおよびqコード内で)、分布が同じ値に収束するまで繰り返します。

コメンターの 1 人が示唆したproc.join()ように、参加後に発生する操作の上になるようにパーツを移動しました。これにより、サブプロセスが生成された段階を超えてコードが進行しないようで、キーボードから実行を中断する必要があります。おそらく、私は何か間違ったことをしているだけです。

#!/usr/bin/python
import sys, commands, string, cPickle
import numpy as np
import scipy.sparse as sp
import scipy.stats as stats
import scipy.linalg as la
from math import ceil
from time import clock
from multiprocessing import Process, Queue
from Queue import Empty

np.random.seed(42)
if not len(sys.argv) == 9:
  print 'ERROR: Usage: python alternating_minimization.py <binary data or sim matrix> <labels_file> <iterations> <num cores> <label percent> <v> <mu> <alpha>'
  sys.exit()

########################################                                                                                                                                                                                                      
# Main Parameters                                                                                                                                                                                                                             
########################################                                                                                                                                                                                                      
similarity_file = sys.argv[1] #output of simgraph_construction.py                                                                                                                                                                             
labels_file = sys.argv[2]
niterations = int(sys.argv[3])
ncores = int(sys.argv[4])

########################################                                                                                                                                                                                                      
# meta parameters                                                                                                                                                                                                                             
########################################                                                                                                                                                                                                      
label_percent = float(sys.argv[5])
v = float(sys.argv[6])
mu = float(sys.argv[7])
alpha = float(sys.argv[8])

########################################                                                                                                                                                                                                      
# load the data file (output of simgraph_construction.py) which is already in numpy format                                                                                                                                                    
########################################                                                                                                                                                                                                      
W = cPickle.load(open(similarity_file, 'r'))
#print some summary statistics about the similarity matrix file                                                                                                                                                                               
print "Number of examples: %d"%(W.shape[0])
print "Sim Matrix: nnz = %d, density = %.2f percent, average # of neighbors per example: %.2f"%(W.nnz, 100*(float(W.nnz)/(W.shape[0]**2)), float(W.nnz)/W.shape[0])

########################################                                                                                                                                                                                                      
# load the labels                                                                                                                                                                                                                             
########################################                                                                                                                                                                                                      
def convertLabels(labels):
  unique_labels = np.unique(labels)
  label_dict = {}
  idx = 0
  for label in unique_labels:
    label_dict[label] = idx
    idx += 1
  return label_dict

LABELS = np.load(labels_file)
print "Number of unique labels: %d"%(np.unique(LABELS).shape)
label_dict = convertLabels(LABELS)
NEW_LABELS = np.array([label_dict[label] for label in LABELS])
dataset_size = LABELS.shape[0]
LABELS = NEW_LABELS
W = W + alpha*sp.identity(dataset_size)

########################################                                                                                                                                                                                                      
# define the labeled and unlabeled idxs                                                                                                                                                                                                       
########################################                                                                                                                                                                                                      
def make_test_set():
  idx = np.random.rand(dataset_size)
  l = (idx < label_percent)
  u = (idx >= label_percent)
  return l,u

L,U = make_test_set()
def createRDistribution(label_bool, labels):
  rows = np.array(range(0, dataset_size), dtype=int)
  label_idx = np.where(~label_bool)
  rows = np.delete(rows, label_idx)
  cols = np.delete(labels, label_idx)
  vals = np.ones((rows.shape[0],1)).ravel()
  sparseR = sp.csc_matrix((vals, (rows, cols)), shape=(dataset_size, max(labels)+1))
  return sparseR

########################################                                                                                                                                                                                                      
# make the distributions for the data                                                                                                                                                                                                         
########################################                                                                                                                                                                                                      
R = createRDistribution(L, LABELS) #labeled distribution is sparse                                                                                                                                                                            
classNoLabel = np.where(R.sum(0) == 0)
#print classNoLabel #need to figure out how many classes are unrepresented in the labeld set                                                                                                                                                  
Q0 = np.zeros((dataset_size, max(LABELS)+1), dtype=np.double)
Q0 += 1.0 / Q0.shape[1]
Q1 = np.zeros((dataset_size, max(LABELS)+1), dtype=np.double)
P0 = np.zeros((dataset_size, max(LABELS)+1), dtype=np.double)
P1 = np.zeros((dataset_size, max(LABELS)+1), dtype=np.double)

def gamma(W_sum,i): #W_sum is sum across all columns of sim matrix W                                                                                                                                                                          
  return v + mu * W_sum[i]

def update_p(startIdx, endIdx, betaMatrix, out_q):
  p = np.zeros((endIdx-startIdx, max(LABELS)+1))
  W_colsum = W.sum(1)
  for i in xrange(startIdx, endIdx): #for i 0 to numExamples - 1                                                                                                                                                                            
    dist = np.zeros(max(LABELS)+1, dtype=np.float64)                                                                                                                                                           
    g = gamma(W_colsum, i)
    dist = np.exp(betaMatrix[i,:] / g)
    dist /= np.sum(dist)
    p[i-startIdx,:] = dist
  out_q.put(p)

def update_q(startIdx, endIdx, sumMatrix, out_q):
  q = np.zeros((endIdx-startIdx, max(LABELS)+1))
  W_rowsum = W.sum(0)
  for i in xrange(startIdx, endIdx):
    dist = np.zeros(max(LABELS)+1, dtype=np.float64)
    labeled = 1 if L[i] else 0
    dist = (R[i,:]*labeled + mu*sumMatrix[i,:]) / (labeled + mu*W_rowsum[0,i])
    dist /= np.sum(dist)
    q[i-startIdx,:] = dist
  out_q.put(q)

def update_pq_wrapper(curIter, chunksize, mat, update_step='p'):
  startIdx = xrange(0, dataset_size, chunksize)
  prevIter = 1 - curIter
  procs = []
  out_q = Queue()
  for i in range(ncores):    #now, start each of the child processes that assembles the matrices individually                                                                                                                               
    start = startIdx[i]
    end = dataset_size if i == ncores-1 else startIdx[i+1]
    if update_step == 'p':
        proc = Process(target=update_p, args=(start, end, mat, out_q))
    else:
        proc = Process(target=update_q, args=(start, end, mat, out_q))
    procs.append(proc)
    proc.start()
  if update_step == 'p':    #once completed, collect results                                                                                                                                                                                
    distMat = P1 if curIter else P0
  else:
    distMat = Q1 if curIter else Q0
  for i in range(ncores):
    p_chunk = out_q.get()
    start = startIdx[i]
    end = dataset_size if i == ncores-1 else startIdx[i+1]
    distMat[start:end,:] = p_chunk
  for proc in procs:
    proc.join()

def compute_tvdist(P,Q):
  tv_dist = 0
  for i in range(0, dataset_size):
    tv_dist += max(np.absolute(P[i,:] - Q[i,:]))
  return tv_dist/dataset_size

def main(argv):
  accuracyArr = []
  tvdistArr = []
  print >> sys.stderr, 'Starting %d iterations...' % niterations
  chunksize = int(ceil(dataset_size/float(ncores)))
  for n in xrange(1,niterations+1):
    print >> sys.stderr, 'Iteration %d' % n
    idx = n % 2
    q_prev = Q1 if not idx else Q0
    p_cur = P1 if idx else P0
    #print q_prev                                                                                                                                                                                                                         
    start_time = clock()
    mat = -v + mu*(W*(np.log(q_prev)-1))
    end_time = clock()
    #print mat                                                                                                                                                                                                                            
    print "Time taken to compute Beta Matrix: %.2f seconds"%(end_time-start_time)
    start_time=clock()
    update_pq_wrapper(idx, chunksize, mat, 'p')
    end_time=clock()
    print "Time taken to update P matrix: %.2f seconds"%(end_time-start_time)
    if not n == niterations:
      start_time = clock()
      mat = W.T*p_cur
      end_time = clock()
      print "Time taken to compute Sum Matrix: %.2f seconds"%(end_time-start_time)
      start_time = clock()
      update_pq_wrapper(idx, chunksize, mat, 'q')
      end_time = clock()
      print "Time taken to update Q matrix: %.2f seconds"%(end_time-start_time)
    ## Evaluation ##                                                                                                                                                                                                                      
    evalMat = P1 if idx else P0                                                                                                                                                   
    predLabel = np.argmax(evalMat, axis=1) #gives the index (column)                                                                                                                                                                      
    accuracy = float(np.sum(predLabel[np.where(U)] == LABELS[np.where(U)]) )/ LABELS[np.where(U)].shape[0]
    print "Accuracy: %.2f"%(accuracy*100)
    accuracyArr.append(accuracy)
    totalVar = []
    if n != niterations:
        tv_dist = compute_tvdist(P1, Q1) if idx else compute_tvdist(P0, Q0)
    else:
        tv_dist = compute_tvdist(P1, Q0) if idx else compute_tvdist(P0, Q1)
    print "Average Total Variation Distance is %.3f"%(tv_dist)
    tvdistArr.append(tv_dist)
  print "Summary of final probability density matrix: "
  print evalMat
  print '\t'.join([str(round(acc,4)) for acc in accuracyArr])
4

1 に答える 1

2

これは解決されました。何を考えていたのかわかりませんが、明らかにサブプロセスから戻ると、データは非同期で書き込まれ、期待していた順序 (行の順序) ではありません。簡単な修正は、サブプロセスがデータのタプルと、データのどの部分を処理しているかを示すインジケーターを返すようにし、この情報を使用して結果のマトリックスを元に戻すことです。

于 2013-02-16T21:30:43.570 に答える