2

最近、研究用のクラスを含むプログラムを作成し、それを並列化しようとしました。Python 2.7 の multiprocessing.Process を JoinableQueue とマネージド データで使用すると、プログラムは最終的に無効なプロセスでハングします。

import multiprocessing as mp
import traceback

class Paramfit(object):
    def __init__(self):
        pass

    def _calc_bond(self, index):
        # Calculate data

    def _calc_parallel(self, index):
        self._calc_bond(index)

    def run(self):
        for ts, force in itertools.izip(self.coortrj, self.forcevec):
        try:
            consumers = [mp.Process(target=self._calc_parallel,
                         args=(force,)) for i in range(nprocs)]
            for w in consumers:
                w.start()

            # Enqueue jobs
            for i in range(self.totalsites):
                self.tasks.put(i)

            # Add a poison pill for each consumer
            for i in range(nprocs):
                self.tasks.put(None)

            self.tasks.close()
            self.tasks.join()

    #       for w in consumers:
    #           w.join()
        except:
            traceback.print_exc()

_calc_parallel は、他のいくつかのクラス メソッドを呼び出します。

http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethodsの他の場所にある copy_reg オプションを使用して、この目的で multiprocessing.Pool を使用しようとさえしました。

import multiprocessing as mp
import traceback

class Paramfit(object):
    def __init__(self):
        pass

    def _calc_bond(self, index):
        # Calculate data

    def _use_force(force):
        # Calculate data

    def _calc_parallel(self, index, force):
        self._calc_bond(index)
        self._use_force(force)

    def run(self):
        try:
            pool = mp.Pool(processes=nprocs, maxtasksperchild=2)
            args = itertools.izip(range(self.totalsites), itertools.repeat(force))
            pool.map_async(self._calc_parallel, args)
            pool.close()
            pool.join()
        except:
            traceback.print_exc()

ただし、pool.map_async は self._calc_parallel を呼び出していないようです。私は両方のケース (プロセスとプール) で何かを見落としていることを知っていますが、何が何であるかは正確にはわかりません。通常、40,000 を超える要素を処理しています。

助けてくれてありがとう。

アップデート

他のいくつかの投稿を読んだ後、pathos.multiprocessing も試しました。

import pathos.multiprocessing as mp
class Paramfit(object):
    def __init__(self):
        pass

    def _calc_bond(self, index):
        # Calculate data

    def _use_force(force):
        # Calculate data

    def _calc_parallel(self, index, force):
        self._calc_bond(index)
        self._use_force(force)

    def run(self):
        try:
            pool = mp.ProcessingPool(nprocs)
            args = itertools.izip(range(self.totalsites), itertools.repeat(force))
            pool.amap(lambda x: self._calc_parallel(*x), args)
        except:
            traceback.print_exc()

そして、以前の試みと同様に、これもメソッドを呼び出さなくてもすばやく処理できるようです。

更新 2

コードを改良して、巨大なクラスをより小さく、より管理しやすいコンポーネントに分割することにしました。ただし、pathos.multiprocessing を使用すると、以前に投稿したように別の状況に遭遇します (リンクを参照)。私の新しいコードには、計算に使用できるオブジェクトがあり、そのメソッドを介して値を返す必要があります。

import itertools
import pandas as pd
import pathos.multiprocessing as mp

class ForceData(object):
    def __init__(self, *args, **kwargs):
        # Setup data
        self.value = pd.DataFrame()
    def calculateBondData(self, index):
        # Calculation
        return self.value
    def calculateNonBondedData(self, index):
        # Calculation
        return self.value
    def calculateAll(self, index):
        # Because self.value is a pandas.DataFrame, changed internally
        self.calculateBondData(index)
        self.calculateNonBondedData(index)
        return self.value

class ForceMatrix(object):
    def __init__(self, *args, **kwargs):
        # Initialize data
        self._matrix = pd.DataFrame()
    def map(self, data):
        for value in data.get():
            for i, j in itertools.product(value.index, repeat=2):
                self._matrix.loc[[i], [j]] += value.values

def calculate(self, *args, **kwargs):
    # Setup initial information.
    fd = ForceData()
    matrix = ForceMatrix()
    pool = mp.ProcessingPool()
    data = pool.amap(fd.calculateAll, range(x))
    matrix.map(data, force)
    return matrix

別の機能だと思っていましたfunc(dataobj, force)が、これも役に立たないようです。現在の速度では、単一のプロセッサでの完全な計算には 1000 時間以上かかると見積もっています。

アップデート 3 (2015 年 4 月 30 日)

@MikeMcKernsの有益な洞察のおかげで、私は可能な解決策に落ち着いたかもしれません. iMac (クアッド コア) またはクラスターの 16 コア ノードでは、結合のない粗粒度 (CG) システムの場合、2 倍itertools.imapが最適なソリューション (1000 CG サイト) のように思われることがわかりました。軌道フレームあたり約 5.2 秒で。結合の詳細 (水を表す 3000 の CG サイト) を含むシステムに移ると、iMac (1 コアを使用) でitertools.imappathos.ThreadingPool.uimap(4 スレッド) 約 85 秒/フレームでクロックインします。@MikeMcKerns のコメントで提案されているように、プロセス プール (4 コア x 2)/スレッド プール (4 スレッド) を試みると、計算時間が 2.5 倍に増加しました。16 コア クラスター (32 pp/16 tp) では、この CG システムも遅くなります (約 160 秒/フレーム)。iMac (1 コア/4 スレッド) 上の 42,778 サイトと多数の結合を備えた CG システムは、約 58 分/フレームのクロックを記録する可能性があります。クラスターの 16 コア ノードでこの大規模システムをまだテストしていませんが、プロセス プール/スレッド プールを使用してさらに高速化できるかどうかはわかりません。

例:

# For a CG system with no bond details
for i in range(nframes):
    data1 = itertools.imap(func1, range(nsites))
    data2 = itertools.imap(func2, data1)
    for values in data2:
        func3(values)

# For a system with bond details
import pathos.multiprocessing as mp

tpool = mp.ThreadingPool(mp.cpu_count())
for i in range(nframes):
    data1 = itertools.imap(func1, range(nsites))
    data2 = tpool.uimap(func2, data1)
    for values in data2:
        func3(values)

# Seems to be the slowest in the bunch on iMac and possibly on 16-cores of a node.
ppool = mp.ProcessingPool(mp.cpu_count() * 2)
tpool = mp.ThreadingPool(mp.cpu_count())
for i in range(nframes):
    data1 = ppool.uimap(func1, range(nsites))
    data2 = tpool.uimap(func2, data1)
    for values in data2:
        func3(values)

システムが大規模になればなるほど、マルチプロセッシングから得られるメリットが大きくなると思います。大規模な CG システム (42,778 サイト) は、0.02 秒/サイト (3000 CG サイト) または 0.05 秒/サイト (結合なしの 1000 サイト) と比較して、約 0.08 秒/サイトかかることがわかっています。

計算時間を短縮しようと努力しているうちに、一部の計算を削減できる領域 (global変数やアルゴリズムの変更など) を発見しましたが、本格的な multirpcoessing によってこれをさらに削減できれば、それは素晴らしいことです。

4

1 に答える 1