6

{string:list}エントリの辞書Dがあり、Dの文字列のペア(s1、s2)に対して関数f(D [s1]、D [s2])->floatを計算します。

さらに、m [ID1、ID2]=1.0などの割り当てを実行できるカスタムマトリックスクラスLabeledNumericMatrixを作成しました。

f(x、y)を計算し、s1 = s2の場合を含め、文字列Sのセット内のすべての2タプルの結果をm [x、y]に格納する必要があります。これはループとしてコーディングするのは簡単ですが、セットSのサイズが10,000以上などの大きな値に大きくなるため、このコードの実行にはかなりの時間がかかります。

ラベル付けされた行列mに格納する結果は、相互に依存していません。したがって、Pythonのマルチスレッドまたはマルチプロセスサービスを使用して、この計算を並列化するのは簡単なようです。ただし、cPythonでは、スレッド化によってf(x、y)の計算とm [x、y]の格納を同​​時に実行することは実際にはできないため、マルチプロセスが私の唯一の選択肢のようです。ただし、マルチプロセスは、10000x10000要素を含むラベル付きマトリックス構造など、プロセス間で約1GBのデータ構造を渡すように設計されているとは思いません。

誰かが(a)アルゴリズムの並列化を回避する必要があるかどうか、および(b)並列化を実行できるかどうか、できればcPythonでその方法をアドバイスできますか?

4

5 に答える 5

6

最初のオプション -サーバー プロセス

サーバー プロセスを作成します。これは、データ構造への並列アクセスを可能にする Multiprocessing パッケージの一部です。このようにして、すべてのプロセスがデータ構造に直接アクセスし、他のプロセスをロックします。

ドキュメントから:

サーバープロセス

Manager() によって返されるマネージャー オブジェクトは、Python オブジェクトを保持するサーバー プロセスを制御し、他のプロセスがプロキシを使用してそれらを操作できるようにします。

Manager() によって返されるマネージャは、タイプ list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Queue、Value、および Array をサポートします。

2 番目のオプション - ワーカーのプール

worker のPool、入力 Queue 、および結果 Queue を作成します。

  • プロデューサとして機能するメイン プロセスは、ペア (s1、s2) を入力キューに供給します。
  • 各ワーカー プロセスは、入力キューからペアを読み取り、結果を出力キューに書き込みます。
  • メイン スレッドは結果キューから結果を読み取り、結果ディクショナリに書き込みます。

3 番目のオプション - 独立した問題に分割する

あなたのデータは独立しています: f( D[s i ],D[s j ] ) は、 f( D[s k ],D[s l ] )から独立した隔離された問題です。さらに、各ペアの計算時間はかなり等しいか、少なくとも同程度である必要があります。

タスクをn 個の入力セットに分割します。ここで、 nは計算ユニット (コア、またはコンピューター) の数です。各入力セットを異なるプロセスに与え、出力を結合します。

于 2012-02-20T10:10:30.923 に答える
2

でパフォーマンスが向上することは絶対にありませんthreading。これは、CPUバウンドタスクには不適切なツールです。

したがって、可能な唯一の選択肢はmultiprocessingですが、ビッグデータ構造があるため、mmap(かなり低レベルですが組み込み)またはRedis(おいしい高レベルAPIですが、インストールして構成する必要があります)のようなものをお勧めします。

于 2012-02-20T10:14:14.963 に答える
1

コードをプロファイリングしましたか? コストが高すぎる f を計算しているだけなのか、それとも結果をデータ構造に格納しているのか (またはその両方) ですか?

f が支配的である場合は、並列化について心配する前に、アルゴリズムを改善できないことを確認する必要があります。おそらくcythonを使用して、関数の一部またはすべてを C 拡張に変換することで、大幅に高速化できる場合があります。マルチプロセッシングを使用する場合、プロセス間でデータ構造全体を渡す必要がある理由がわかりませんか?

結果を行列に格納するのが高すぎる場合は、より効率的なデータ構造 ( array.arraynumpy.ndarrayなど) を使用してコードを高速化できます。カスタム マトリックス クラスの設計と実装を非常に慎重に行っていない限り、ほぼ確実にそれらよりも遅くなります。

于 2012-02-20T12:34:18.077 に答える
0

皆様からのご回答ありがとうございました。

私は提案された問題の解決策(「その」解決策ではない)を作成しました。他の人がそれを役立つと思うかもしれないので、ここにコードを投稿します。私の解決策は、AdamMatanによって提案されたオプション1と3のハイブリッドです。コードには、viセッションの行番号が含まれています。これは、以下の説明に役立ちます。

 12 # System libraries needed by this module.
 13 import numpy, multiprocessing, time
 14 
 15 # Third-party libraries needed by this module.
 16 import labeledMatrix
 17 
 18 # ----- Begin code for this module. -----
 19 from commonFunctions import debugMessage
 20 
 21 def createSimilarityMatrix( fvFileHandle, fvFileParser, fvSimScorer, colIDs, rowIDs=None,
 22                             exceptionType=ValueError, useNumType=numpy.float, verbose=False,
 23                             maxProcesses=None, processCheckTime=1.0 ):
 24  """Create a labeled similarity matrix from vectorial data in [fvFileHandle] that can be
 25  parsed by [fvFileParser].
 26  [fvSimScorer] should be a function that can return a floating point value for a pair of vectors.
 27 
 28  If the matrix [rowIDs] are not specified, they will be the same as the [colIDs].
 29 
 30  [exceptionType] will be raised when a row or column ID cannot be found in the vectorial data.
 31  [maxProcesses] specifies the number of CPUs to use for calculation; default value is all available CPUs.
 32  [processCheckTime] is the interval for checking activity of CPUs (if completed calculation or not).
 33 
 34  Return: a LabeledNumericMatrix with corresponding row and column IDs."""
 35 
 36  # Setup row/col ID information.
 37  useColIDs = list( colIDs )
 38  useRowIDs = rowIDs or useColIDs
 39  featureData = fvFileParser( fvFileHandle, retainIDs=(useColIDs+useRowIDs) )
 40  verbose and debugMessage( "Retrieved %i feature vectors from FV file." % len(featureData) )
 41  featureIDs = featureData.keys()
 42  absentIDs = [ ID for ID in set(useColIDs + useRowIDs) if ID not in featureIDs ]
 43  if absentIDs: 
 44   raise exceptionType, "IDs %s not found in feature vector file." % absentIDs
 45  # Otherwise, proceed to creation of matrix.
 46  resultMatrix = labeledMatrix.LabeledNumericMatrix( useRowIDs, useColIDs, numType=useNumType )
 47  calculateSymmetric = True if set( useRowIDs ) == set( useColIDs ) else False
 48  
 49  # Setup data structures needed for parallelization.
 50  numSubprocesses = multiprocessing.cpu_count() if maxProcesses==None else int(maxProcesses)
 51  assert numSubprocesses >= 1, "Specification of %i CPUs to calculate similarity matrix." % numSubprocesses
 52  dataManager = multiprocessing.Manager()
 53  sharedFeatureData = dataManager.dict( featureData )
 54  resultQueue = multiprocessing.Queue() 
 55  # Assign jobs evenly through number of processors available.
 56  jobList = [ list() for i in range(numSubprocesses) ]
 57  calculationNumber = 0 # Will hold total number of results stored.
 58  if calculateSymmetric: # Perform calculations with n(n+1)/2 pairs, instead of n^2 pairs.
 59   remainingIDs = list( useRowIDs )
 60   while remainingIDs:
 61    firstID = remainingIDs[0]
 62    for secondID in remainingIDs:
 63     jobList[ calculationNumber % numSubprocesses ].append( (firstID, secondID) )
 64     calculationNumber += 1
 65    remainingIDs.remove( firstID )
 66  else: # Straight processing one at a time.
 67   for rowID in useRowIDs:
 68    for colID in useColIDs:
 69     jobList[ calculationNumber % numSubprocesses ].append( (rowID, colID) )
 70     calculationNumber += 1
 71     
 72  verbose and debugMessage( "Completed setup of job distribution: %s." % [len(js) for js in jobList] )
 73  # Define a function to perform calculation and store results
 74  def runJobs( scoreFunc, pairs, featureData, resultQueue ):
 75   for pair in pairs:
 76    score = scoreFunc( featureData[pair[0]], featureData[pair[1]] )
 77    resultQueue.put( ( pair, score ) )
 78   verbose and debugMessage( "%s: completed all calculations." % multiprocessing.current_process().name )
 79   
 80   
 81  # Create processes to perform parallelized computing.
 82  processes = list()
 83  for num in range(numSubprocesses):
 84   processes.append( multiprocessing.Process( target=runJobs,
 85                                              args=( fvSimScorer, jobList[num], sharedFeatureData, resultQueue ) ) )
 86  # Launch processes and wait for them to all complete.
 87  import Queue # For Queue.Empty exception.
 88  for p in processes:
 89   p.start()
 90  assignmentsCompleted = 0
 91  while assignmentsCompleted < calculationNumber:
 92   numActive = [ p.is_alive() for p in processes ].count( True )
 93   verbose and debugMessage( "%i/%i complete; Active processes: %i" % \
 94               ( assignmentsCompleted, calculationNumber, numActive ) )
 95   while True: # Empty queue immediately to avoid underlying pipe/socket implementation from hanging.
 96    try: 
 97     pair, score = resultQueue.get( block=False )
 98     resultMatrix[ pair[0], pair[1] ] = score
 99     assignmentsCompleted += 1
100     if calculateSymmetric:
101      resultMatrix[ pair[1], pair[0] ] = score
102    except Queue.Empty:
103     break 
104   if numActive == 0: finished = True
105   else:
106    time.sleep( processCheckTime )
107  # Result queue emptied and no active processes remaining - completed calculations.
108  return resultMatrix
109 ## end of createSimilarityMatrix()

36〜47行目は、元の質問の一部であった問題の定義に関連する単なる予備的なものです。cPythonのGILを回避するためのマルチプロセッシングのセットアップは49〜56行目で、57〜70行目は細分化されたタスクを均等に作成するために使用されます。行/列IDのリストが40,000程度に達すると、製品が大量のメモリを消費することになるため、itertools.productの代わりに57〜70行目のコードが使用されます。

実行される実際の計算は74〜78行目であり、ここではID->vectorエントリの共有ディクショナリと共有結果キューが使用されます。

行81〜85は、実際にはまだ開始されていませんが、実際のP​​rocessオブジェクトをセットアップします。

私の最初の試み(ここには示されていません)では、「try ... resultQueue.get()andassignただし...」コードは実際には外部制御ループの外側にありました(すべての計算が終了したわけではありません)。そのバージョンのコードを9x9マトリックスの単体テストで実行したところ、問題はありませんでした。ただし、200x200以上に移動すると、実行間でコードを変更しなかったにもかかわらず、このコードがハングすることがわかりました。

このディスカッション(http://bugs.python.org/issue8426)とマルチプロセスの公式ドキュメントによると、基盤となる実装のパイプ/ソケットサイズが非常に大きくない場合、multiprocess.Queueの使用がハングする可能性があります。したがって、私のソリューションとしてここに示したコードは、プロセスの完了を確認しながら定期的にキューを空にし(91〜106行目を参照)、子プロセスが新しい結果を入力し続け、パイプがいっぱいになるのを防ぎます。

1000x1000のより大きな行列でコードをテストしたとき、計算コードがキューおよび行列割り当てコードよりもかなり前に終了していることに気付きました。cProfileを使用すると、1つのボトルネックがデフォルトのポーリング間隔processCheckTime = 1.0(23行目)であり、この値を下げると結果の速度が向上することがわかりました(タイミングの例については投稿の下部を参照してください)。これは、Pythonでのマルチプロセッシングに不慣れな他の人々にとって役立つ情報かもしれません。

全体として、これはおそらく可能な限り最良の実装ではありませんが、さらなる最適化の出発点を提供します。よく言われるように、並列化による最適化には適切な分析と思考が必要です。

タイミングの例、すべて8個のCPUを使用。

200x200(20100計算/割り当て)

t = 1.0:実行時間18秒

t = 0.01:実行時間3秒

500x500(125250の計算/割り当て)

t = 1.0:実行時間86秒

t = 0.01:実行時間23秒

誰かがコードをコピーして貼り付けたい場合に備えて、これは私が開発の一部に使用した単体テストです。明らかに、ラベル付けされたマトリックスクラスコードはここにはなく、指紋リーダー/スコアラーコードは含まれていません(ただし、独自のコードを作成するのは非常に簡単です)。もちろん、誰かを助けてくれるなら、そのコードも共有できてうれしいです。

112 def unitTest():
113  import cStringIO, os
114  from fingerprintReader import MismatchKernelReader
115  from fingerprintScorers import FeatureVectorLinearKernel
116  exampleData = cStringIO.StringIO() # 9 examples from GPCR (3,1)-mismatch descriptors, first 10 columns.
117  exampleData.write( ",AAA,AAC,AAD,AAE,AAF,AAG,AAH,AAI,AAK"  + os.linesep )
118  exampleData.write( "TS1R2_HUMAN,5,2,3,6,8,6,6,7,4" + os.linesep )
119  exampleData.write( "SSR1_HUMAN,11,6,5,7,4,7,4,7,9" + os.linesep )
120  exampleData.write( "OXYR_HUMAN,27,13,14,14,15,14,11,16,14" + os.linesep )
121  exampleData.write( "ADA1A_HUMAN,7,3,5,4,5,7,3,8,4" + os.linesep )
122  exampleData.write( "TA2R_HUMAN,16,6,7,8,9,10,6,6,6" + os.linesep )
123  exampleData.write( "OXER1_HUMAN,10,6,5,7,11,9,5,10,6" + os.linesep )
124  exampleData.write( "NPY1R_HUMAN,3,3,0,2,3,1,0,6,2" + os.linesep )
125  exampleData.write( "NPSR1_HUMAN,0,1,1,0,3,0,0,6,2" + os.linesep )
126  exampleData.write( "HRH3_HUMAN,16,9,9,13,14,14,9,11,9" + os.linesep )
127  exampleData.write( "HCAR2_HUMAN,3,1,3,2,5,1,1,6,2" )
128  columnIDs = ( "TS1R2_HUMAN", "SSR1_HUMAN", "OXYR_HUMAN", "ADA1A_HUMAN", "TA2R_HUMAN", "OXER1_HUMAN",
129                "NPY1R_HUMAN", "NPSR1_HUMAN", "HRH3_HUMAN", "HCAR2_HUMAN", )
130  m = createSimilarityMatrix( exampleData, MismatchKernelReader, FeatureVectorLinearKernel, columnIDs,
131                              verbose=True, )
132  m.SetOutputPrecision( 6 )
133  print m
134 
135 ## end of unitTest()
于 2012-03-21T15:21:04.187 に答える
0

3 月 21 日に投稿されたコードに添付された最後のコメントを参照すると、次の 2 つの問題が発生したため、特定のタスクでは multiprocessing.Pool + SQLite (pysqlite2) を使用できないことがわかりました。

(1) デフォルトの接続を使用すると、最初のワーカーを除き、挿入クエリを実行した他のすべてのワーカー プロセスは 1 回だけ実行されます。(2) 接続キーワードを check_same_thread=False に変更すると、ワーカーの完全なプールが使用されますが、一部のクエリのみが成功し、一部のクエリは失敗します。各ワーカーが time.sleep(0.01) も実行すると、クエリの失敗数は減少しましたが、完全ではありませんでした。(3) それほど重要ではありませんが、10 個の挿入クエリの小さなジョブ リストであっても、ハードディスクの読み取り/書き込みが必死に聞こえました。

次に MySQL-Python に頼ったところ、うまくいきました。確かに、MySQL サーバー デーモン、ユーザー、およびそのユーザーのデータベースをセットアップする必要がありますが、これらの手順は比較的簡単です。

これが私のために働いたサンプルコードです。明らかに最適化できますが、マルチプロセッシングの使用を開始する方法を探している人に基本的な考え方を伝えます.

  1 from multiprocessing import Pool, current_process
  2 import MySQLdb
  3 from numpy import random
  4
  5 
  6 if __name__ == "__main__":
  7  
  8   numValues   = 50000
  9   tableName   = "tempTable"
 10   useHostName = ""
 11   useUserName = ""  # Insert your values here.
 12   usePassword = ""
 13   useDBName   = ""
 14   
 15   # Setup database and table for results.
 16   dbConnection = MySQLdb.connect( host=useHostName, user=useUserName, passwd=usePassword, db=useDBName )
 17   topCursor = dbConnection.cursor()
 18   # Assuming table does not exist, will be eliminated at the end of the script.
 19   topCursor.execute( 'CREATE TABLE %s (oneText TEXT, oneValue REAL)' % tableName )
 20   topCursor.close() 
 21   dbConnection.close()
 22   
 23   # Define simple function for storing results.
 24   def work( storeValue ):
 25     #print "%s storing value %f" % ( current_process().name, storeValue )
 26     try:
 27       dbConnection = MySQLdb.connect( host=useHostName, user=useUserName, passwd=usePassword, db=useDBName )
 28       cursor = dbConnection.cursor()
 29       cursor.execute( "SET AUTOCOMMIT=1" )
 30       try:
 31         query = "INSERT INTO %s VALUES ('%s',%f)" % ( tableName, current_process().name, storeValue )
 32         #print query
 33         cursor.execute( query )
 34       except:
 35         print "Query failed."
 36       
 37       cursor.close()
 38       dbConnection.close()
 39     except: 
 40       print "Connection/cursor problem."
 41   
 42   
 43   # Create set of values to assign
 44   values = random.random( numValues )
 45   
 46   # Create pool of workers 
 47   pool = Pool( processes=6 )
 48   # Execute assignments.
 49   for value in values: pool.apply_async( func=work, args=(value,) )
 50   pool.close()
 51   pool.join()
 52 
 53   # Cleanup temporary table.
 54   dbConnection = MySQLdb.connect( host=useHostName, user=useUserName, passwd=usePassword, db=useDBName )
 55   topCursor = dbConnection.cursor()
 56   topCursor.execute( 'DROP TABLE %s' % tableName )
 57   topCursor.close()
 58   dbConnection.close()
于 2012-03-27T07:20:27.517 に答える