1

計算速度を下げるためにPythonでマルチプロセッシングを構築しようとしていますが、マルチプロセッシングの後、全体の計算速度が大幅に低下したようです。4 つの異なるプロセスを作成し、dataFrame を 4 つの異なるデータフレームに分割しました。これが各プロセスへの入力になります。各プロセスのタイミングを計った後、オーバーヘッド コストが大きいように見え、これらのオーバーヘッド コストを削減する方法があるかどうか疑問に思っていました。

私はwindows7、python 3.5を使用しており、私のマシンには8つのコアがあります。

def doSomething(args, dataPassed,):

    processing data, and calculating outputs

def parallelize_dataframe(df, nestedApply):
    df_split = np.array_split(df, 4)
    pool = multiprocessing.Pool(4)
    df = pool.map(nestedApply, df_split)
    print ('finished with Simulation')
    time = float((dt.datetime.now() - startTime).total_seconds())

    pool.close()
    pool.join()

def nestedApply(df):

    func2 = partial(doSomething, args=())
    res = df.apply(func2, axis=1)
    res =  [output Tables]
    return res

if __name__ == '__main__':

data = pd.read_sql_query(query, conn)

parallelize_dataframe(data, nestedApply)
4

1 に答える 1

1

DataFrame をチャンクとして提供する代わりに、キューを使用することをお勧めします。各チャンクをコピーするには多くのリソースが必要であり、これにはかなりの時間がかかります。DataFrame が非常に大きい場合、メモリが不足する可能性があります。キューを使用すると、パンダの高速イテレータの恩恵を受けることができます。これが私のアプローチです。オーバーヘッドは、ワーカーの複雑さに応じて減少します。残念ながら、私のワーカーはそれを実際に示すには単純にはほど遠いですが、sleep複雑さを少しシミュレートしています。

import pandas as pd
import multiprocessing as mp
import numpy as np
import time


def worker(in_queue, out_queue):
    for row in iter(in_queue.get, 'STOP'):
        value = (row[1] * row[2] / row[3]) + row[4]
        time.sleep(0.1)
        out_queue.put((row[0], value))

if __name__ == "__main__":
    # fill a DataFrame
    df = pd.DataFrame(np.random.randn(1e5, 4), columns=list('ABCD'))

    in_queue = mp.Queue()
    out_queue = mp.Queue()

    # setup workers
    numProc = 2
    process = [mp.Process(target=worker,
                          args=(in_queue, out_queue)) for x in range(numProc)]

    # run processes
    for p in process:
        p.start()

    # iterator over rows
    it = df.itertuples()

    # fill queue and get data
    # code fills the queue until a new element is available in the output
    # fill blocks if no slot is available in the in_queue
    for i in range(len(df)):
        while out_queue.empty():
            # fill the queue
            try:
                row = next(it)
                in_queue.put((row[0], row[1], row[2], row[3], row[4]), block=True)  # row = (index, A, B, C, D) tuple
            except StopIteration:
                break
        row_data = out_queue.get()
        df.loc[row_data[0], "Result"] = row_data[1]

    # signals for processes stop
    for p in process:
        in_queue.put('STOP')

    # wait for processes to finish
    for p in process:
        p.join()

それを使用numProc = 2すると、ループごとに 50 秒かかり、numProc = 42 倍の速さになります。

于 2016-11-23T10:39:25.273 に答える