2

私は現在、networkx 関数 *all_simple_paths* を使用して、特定のソース ノードとターゲット ノードのセットについて、ネットワーク G 内のすべてのパスを検索しています。

大規模で高密度のネットワークでは、このプロセスは非常に集中的です。

この問題でマルチプロセッシングを使用できる可能性があるかどうか、およびプールなどの作成を通じて、それを実装する方法について誰かがアイデアを持っているかどうかを知りたい.

import networkx as nx

G = nx.complete_graph(8)
sources = [1,2]
targets = [5,6,7]

for target in targets:
    for source in sources:
        for path in nx.all_simple_paths(G, source=source, target=target, cutoff=None):
            print(path)

ご提案いただきありがとうございます。

4

2 に答える 2

3

最も単純なケースでは、パスは同じグラフの一部である以外は互いに関係がないように見えるため、ロックの問題は発生しません。

私がすることは、multiprocessingモジュールを使用して、各ループでtargetsaPoolおよびmapメソッドを使用して新しいプロセスを開始できることです。

def create_graph_from_target( target )
    for source in sources:
        for path in nx.all_simple_paths(G, source=source, target=target, cutoff=None):
            print(path)

from multiprocessing import Pool
p = Pool( processes=4 )

p.map( create_graph_from_target, targets )
p.close()
p.join()
于 2012-12-21T16:39:48.037 に答える
2

これは、ワーカープロセスのコレクションを使用するバージョンです。各ワーカーはsource, targetキューからペアを取得し、リスト内のパスを収集します。すべてのパスが見つかると、結果は出力キューに入れられ、メインプロセスによって照合されます。

import networkx as nx
import multiprocessing as mp
import random
import sys
import itertools as IT
import logging
logger = mp.log_to_stderr(logging.DEBUG)


def worker(inqueue, output):
    result = []
    count = 0
    for pair in iter(inqueue.get, sentinel):
        source, target = pair
        for path in nx.all_simple_paths(G, source = source, target = target,
                                        cutoff = None):
            result.append(path)
            count += 1
            if count % 10 == 0:
                logger.info('{c}'.format(c = count))
    output.put(result)

def test_workers():
    result = []
    inqueue = mp.Queue()
    for source, target in IT.product(sources, targets):
        inqueue.put((source, target))
    procs = [mp.Process(target = worker, args = (inqueue, output))
             for i in range(mp.cpu_count())]
    for proc in procs:
        proc.daemon = True
        proc.start()
    for proc in procs:    
        inqueue.put(sentinel)
    for proc in procs:
        result.extend(output.get())
    for proc in procs:
        proc.join()
    return result

def test_single_worker():
    result = []
    count = 0
    for source, target in IT.product(sources, targets):
        for path in nx.all_simple_paths(G, source = source, target = target,
                                        cutoff = None):
            result.append(path)
            count += 1
            if count % 10 == 0:
                logger.info('{c}'.format(c = count))

    return result

sentinel = None

seed = 1
m = 1
N = 1340//m
G = nx.gnm_random_graph(N, int(1.7*N), seed)
random.seed(seed)
sources = [random.randrange(N) for i in range(340//m)]
targets = [random.randrange(N) for i in range(1000//m)]
output = mp.Queue()

if __name__ == '__main__':
    test_workers()
    # test_single_worker()
    # assert set(map(tuple, test_workers())) == set(map(tuple, test_single_worker()))

test_workersマルチプロセッシングをtest_single_worker使用し、単一のプロセスを使用します。

実行test.pyしてもAssertionErrorは発生しないため、両方の関数が同じ結果を返すように見えます(少なくとも実行した限られたテストでは)。

timeitの結果は次のとおりです。

% python -mtimeit -s'import test as t' 't.test_workers()'
10 loops, best of 3: 6.71 sec per loop

% python -mtimeit -s'import test as t' 't.test_single_worker()'
10 loops, best of 3: 12.2 sec per loop

したがって、この場合、test_workersは2コアシステムでtest_single_workerの1.8倍のスピードアップを達成することができました。うまくいけば、コードは実際の問題に対しても適切にスケーリングされます。結果を知りたいです。


いくつかの興味深い点:

  • 短期間の関数の呼び出しpool.apply_asyncは非常に遅くなります。これは、引数の受け渡しに時間がかかりすぎて、CPUを使用して有用な計算を行うのではなく、キューを介して結果が得られるためです。
  • output結果を一度に1つずつ入れるよりも、結果をリストに集めて完全な結果をキューに入れる方がよいでしょうoutput。キューに入れられた各オブジェクトはピクルス化され、多くの小さなリストよりも1つの大きなリストをピクルス化する方が高速です。
  • 1つのプロセスからのみ印刷する方が安全だと思います。そのため、printステートメントは互いにステップしません(出力が壊れてしまいます)。
于 2012-12-21T17:55:57.457 に答える