30

私は Python の機能を 1 時間以上いじり、andmultiprocessingを使用してかなり複雑なグラフ トラバーサル関数を並列化しようとしています。multiprocessing.Processmultiprocessing.Manager

import networkx as nx
import csv
import time 
from operator import itemgetter
import os
import multiprocessing as mp

cutoff = 1

exclusionlist = ["cpd:C00024"]

DG = nx.read_gml("KeggComplete.gml", relabel=True)

for exclusion in exclusionlist:
    DG.remove_node(exclusion)

# checks if 'memorizedPaths exists, and if not, creates it
fn = os.path.join(os.path.dirname(__file__),
                  'memorizedPaths' + str(cutoff+1))
if not os.path.exists(fn):
    os.makedirs(fn)

manager = mp.Manager()
memorizedPaths = manager.dict()
filepaths = manager.dict()
degreelist = sorted(DG.degree_iter(),
                    key=itemgetter(1),
                    reverse=True)

def _all_simple_paths_graph(item, DG, cutoff, memorizedPaths, filepaths):
    source = item[0]
    uniqueTreePaths = []

    if cutoff < 1:
        return

    visited = [source]
    stack = [iter(DG[source])]

    while stack:
        children = stack[-1]
        child = next(children, None)

        if child is None:
            stack.pop()
            visited.pop()
        elif child in memorizedPaths:
            for path in memorizedPaths[child]:
                newPath = (tuple(visited) + tuple(path))
                if (len(newPath) <= cutoff) and
                    (len(set(visited) & set(path)) == 0):
                    uniqueTreePaths.append(newPath)
            continue
        elif len(visited) < cutoff:
            if child not in visited:
                visited.append(child)
                stack.append(iter(DG[child]))

                if visited not in uniqueTreePaths:
                    uniqueTreePaths.append(tuple(visited))
        else: # len(visited) == cutoff:
            if (visited not in uniqueTreePaths) and
                (child not in visited):
                uniqueTreePaths.append(tuple(visited + [child]))
            stack.pop()
            visited.pop()
    # writes the absolute path of the node path file into the hash table
    filepaths[source] = str(fn) + "/" + str(source) + "path.txt"
    with open (filepaths[source], "wb") as csvfile2:
        writer = csv.writer(csvfile2, delimiter=" ", quotechar="|")
        for path in uniqueTreePaths:
            writer.writerow(path)

    memorizedPaths[source] = uniqueTreePaths

############################################################################

if __name__ == '__main__':
    start = time.clock()

    for item in degreelist:
        test = mp.Process(target=_all_simple_paths_graph,
                          args=(DG, cutoff, item, memorizedPaths, filepaths))
        test.start()
        test.join()

end = time.clock()
print (end-start)

現在 - 運と魔法ですが - 動作します (ある程度)。私の問題は、24 個のコアのうち 12 個しか使用していないことです。

誰かがなぜこれが当てはまるのか説明できますか? おそらく、私のコードは最適なマルチプロセッシング ソリューションではありませんか、それともUbuntu 13.04 x64 で実行されている Intel Xeon CPU E5-2640 @ 2.50GHz x18アーキテクチャの機能ですか?

編集:

私はなんとか手に入れました:

p = mp.Pool()
for item in degreelist:
    p.apply_async(_all_simple_paths_graph,
                  args=(DG, cutoff, item, memorizedPaths, filepaths))
p.close()
p.join()

動作していますが、非常に遅いです! そのため、仕事に間違った機能を使用していると思います。うまくいけば、私が達成しようとしていることを正確に明確にするのに役立ちます!

EDIT2:.map試行:

partialfunc = partial(_all_simple_paths_graph,
                      DG=DG,
                      cutoff=cutoff,
                      memorizedPaths=memorizedPaths,
                      filepaths=filepaths)
p = mp.Pool()
for item in processList:
    processVar = p.map(partialfunc, xrange(len(processList)))   
p.close()
p.join()

動作しますが、シングルコアよりも遅いです。最適化する時が来ました!

4

1 に答える 1

58

コメントで対処するにはここに山が多すぎるので、どこにあるのmpですかmultiprocessing

mp.cpu_count()プロセッサの数を返す必要があります。しかし、それをテストしてください。一部のプラットフォームはファンキーであり、この情報は常に簡単に入手できるとは限りません。Python はできる限りのことを行います。

24 個のプロセスを開始すると、指定したとおりに実行されます ;-)mp.Pool()が最も便利なようです。作成するプロセスの数をそのコンストラクターに渡します。 プロセッサの数にmp.Pool(processes=None)使用されます。mp.cpu_count()

次に、たとえばインスタンスで使用して、.imap_unordered(...)プロセス全体Poolに分散させることができdegreelistます。または、他のPool方法がうまくいくかもしれません-実験してください。

Pool問題をの世界観に打ち込むことができない場合は、代わりにmp.Queueを作成して作業キューを作成し、.put()ノード (またはオーバーヘッドを減らすためにノードのスライス) を作成してメイン プログラムで作業し、次のように記述します。ワーカーは.get()そのキューからアイテムを処理します。例が必要かどうか尋ねてください。すべての「実際の」作業項目の後に、センチネル値 (プロセスごとに 1 つ) をキューに入れる必要があることに注意してください。これにより、ワーカー プロセスは、センチネルがいつ完了したかをテストできるようになります。

参考までに、キューはより明示的であるため、キューが好きです。他の多くの人は、より魔法的であるため、 の方が好きPoolです ;-)

プールの例

これが実行可能なプロトタイプです。これはimap_unorderedwithを使用する 1 つの方法を示してPoolおりchunksize、関数のシグネチャを変更する必要はありません。もちろん、実際のコードをプラグインする必要がありinit_workerますdegreeslist。プロセス間通信の量を削減することは、速度にとって非常に重要です。

import multiprocessing as mp

def init_worker(mps, fps, cut):
    global memorizedPaths, filepaths, cutoff
    global DG

    print "process initializing", mp.current_process()
    memorizedPaths, filepaths, cutoff = mps, fps, cut
    DG = 1##nx.read_gml("KeggComplete.gml", relabel = True)

def work(item):
    _all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths)

def _all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths):
    pass # print "doing " + str(item)

if __name__ == "__main__":
    m = mp.Manager()
    memorizedPaths = m.dict()
    filepaths = m.dict()
    cutoff = 1 ##
    # use all available CPUs
    p = mp.Pool(initializer=init_worker, initargs=(memorizedPaths,
                                                   filepaths,
                                                   cutoff))
    degreelist = range(100000) ##
    for _ in p.imap_unordered(work, degreelist, chunksize=500):
        pass
    p.close()
    p.join()

これをそのまま実行することを強くお勧めします。そうすれば、非常に高速であることがわかります。次に、何かを少しずつ追加して、それが時間にどのように影響するかを確認します。たとえば、追加するだけです

   memorizedPaths[item] = item

それを大幅に_all_simple_paths_graph()遅くします。なんで?dict は追加するたびにどんどん大きくなり、このプロセスセーフな dict はすべてのプロセス間で (カバーの下で) 同期する必要があるためです。同期の単位は「辞書全体」です。共有辞書の増分更新を行うために mp 機構が利用できる内部構造はありません。

この費用を支払う余裕がない場合、Manager.dict()これに a を使用することはできません。賢くなるチャンスはたくさんあります ;-)

于 2013-09-30T16:05:28.257 に答える