8

Multiprocessing パッケージ (Amazon EC2 の Ubuntu 12.04 で numpy 1.7.0 を使用した python 2.73) を使用して、いくつかの単純な numpy ベースの行列代数計算を並行して実行しているときに、システム エラー (以下を参照) が発生します。私のコードは小さい行列サイズでは正常に動作しますが、大きい行列サイズではクラッシュします (利用可能なメモリが十分にある場合)。

私が使用するマトリックスのサイズはかなりのものです (私のコードは 1000000x10 のフロート密度の高いマトリックスでは正常に動作しますが、1000000x500 のマトリックスではクラッシュします - ちなみに、これらのマトリックスをサブプロセスとの間で受け渡ししています)。10 対 500 は実行時パラメーターであり、他のすべては同じままです (入力データ、他の実行時パラメーターなど)。

また、python3を使用して同じ(移植された)コードを実行しようとしました-より大きなマトリックスの場合、サブプロセスは(python 2.7のようにクラッシュするのではなく)スリープ/アイドルモードになり、プログラム/サブプロセスは何もせずにハングします。小さい行列の場合、コードは python3 で正常に実行されます。

どんな提案でも大歓迎です(私はここでアイデアを使い果たしています)

エラーメッセージ:

Exception in thread Thread-5: Traceback (most recent call last):  
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()   File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)   File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task) SystemError: NULL result without error in PyObject_Call

私が使用するマルチプロセッシングコード:

def runProcessesInParallelAndReturn(proc, listOfInputs, nParallelProcesses):
    if len(listOfInputs) == 0:
        return
    # Add result queue to the list of argument tuples.
    resultQueue = mp.Manager().Queue()
    listOfInputsNew = [(argumentTuple, resultQueue) for argumentTuple in listOfInputs]
    # Create and initialize the pool of workers.
    pool = mp.Pool(processes = nParallelProcesses)
    pool.map(proc, listOfInputsNew)
    # Run the processes.
    pool.close()
    pool.join()
    # Return the results.
    return [resultQueue.get() for i in range(len(listOfInputs))]

以下は、各サブプロセスに対して実行される「proc」です。基本的に、numpy を使用して線形方程式の多くのシステムを解き (サブプロセス内で必要な行列を構築します)、結果を別の行列として返します。繰り返しになりますが、1 つの実行時パラメーターの値が小さい場合は問題なく動作しますが、値が大きい場合はクラッシュ (または python3 でハング) します。

def solveForLFV(param):
    startTime = time.time()
    (chunkI, LFVin, XY, sumLFVinOuterProductLFVallPlusPenaltyTerm, indexByIndexPurch, outerProductChunkSize, confWeight), queue = param
    LFoutChunkSize = XY.shape[0]
    nLFdim = LFVin.shape[1]
    sumLFVinOuterProductLFVpurch = np.zeros((nLFdim, nLFdim))
    LFVoutChunk = np.zeros((LFoutChunkSize, nLFdim))
    for LFVoutIndex in xrange(LFoutChunkSize):
        LFVInIndexListPurch = indexByIndexPurch[LFVoutIndex]
        sumLFVinOuterProductLFVpurch[:, :] = 0.
        LFVInIndexChunkLow, LFVInIndexChunkHigh = getChunkBoundaries(len(LFVInIndexListPurch), outerProductChunkSize)
        for LFVInIndexChunkI in xrange(len(LFVInIndexChunkLow)):
            LFVinSlice = LFVin[LFVInIndexListPurch[LFVInIndexChunkLow[LFVInIndexChunkI] : LFVInIndexChunkHigh[LFVInIndexChunkI]], :]
            sumLFVinOuterProductLFVpurch += sum(LFVinSlice[:, :, np.newaxis] * LFVinSlice[:, np.newaxis, :])
        LFVoutChunk[LFVoutIndex, :] = np.linalg.solve(confWeight * sumLFVinOuterProductLFVpurch + sumLFVinOuterProductLFVallPlusPenaltyTerm, XY[LFVoutIndex, :])
    queue.put((chunkI, LFVoutChunk))
    print 'solveForLFV: ', time.time() - startTime, 'sec'
    sys.stdout.flush()
4

1 に答える 1