これは、ワーカープロセスのコレクションを使用するバージョンです。各ワーカーはsource, target
キューからペアを取得し、リスト内のパスを収集します。すべてのパスが見つかると、結果は出力キューに入れられ、メインプロセスによって照合されます。
import networkx as nx
import multiprocessing as mp
import random
import sys
import itertools as IT
import logging
logger = mp.log_to_stderr(logging.DEBUG)
def worker(inqueue, output):
result = []
count = 0
for pair in iter(inqueue.get, sentinel):
source, target = pair
for path in nx.all_simple_paths(G, source = source, target = target,
cutoff = None):
result.append(path)
count += 1
if count % 10 == 0:
logger.info('{c}'.format(c = count))
output.put(result)
def test_workers():
result = []
inqueue = mp.Queue()
for source, target in IT.product(sources, targets):
inqueue.put((source, target))
procs = [mp.Process(target = worker, args = (inqueue, output))
for i in range(mp.cpu_count())]
for proc in procs:
proc.daemon = True
proc.start()
for proc in procs:
inqueue.put(sentinel)
for proc in procs:
result.extend(output.get())
for proc in procs:
proc.join()
return result
def test_single_worker():
result = []
count = 0
for source, target in IT.product(sources, targets):
for path in nx.all_simple_paths(G, source = source, target = target,
cutoff = None):
result.append(path)
count += 1
if count % 10 == 0:
logger.info('{c}'.format(c = count))
return result
sentinel = None
seed = 1
m = 1
N = 1340//m
G = nx.gnm_random_graph(N, int(1.7*N), seed)
random.seed(seed)
sources = [random.randrange(N) for i in range(340//m)]
targets = [random.randrange(N) for i in range(1000//m)]
output = mp.Queue()
if __name__ == '__main__':
test_workers()
# test_single_worker()
# assert set(map(tuple, test_workers())) == set(map(tuple, test_single_worker()))
test_workers
マルチプロセッシングをtest_single_worker
使用し、単一のプロセスを使用します。
実行test.py
してもAssertionErrorは発生しないため、両方の関数が同じ結果を返すように見えます(少なくとも実行した限られたテストでは)。
timeitの結果は次のとおりです。
% python -mtimeit -s'import test as t' 't.test_workers()'
10 loops, best of 3: 6.71 sec per loop
% python -mtimeit -s'import test as t' 't.test_single_worker()'
10 loops, best of 3: 12.2 sec per loop
したがって、この場合、test_workersは2コアシステムでtest_single_workerの1.8倍のスピードアップを達成することができました。うまくいけば、コードは実際の問題に対しても適切にスケーリングされます。結果を知りたいです。
いくつかの興味深い点:
- 短期間の関数の呼び出し
pool.apply_async
は非常に遅くなります。これは、引数の受け渡しに時間がかかりすぎて、CPUを使用して有用な計算を行うのではなく、キューを介して結果が得られるためです。
output
結果を一度に1つずつ入れるよりも、結果をリストに集めて完全な結果をキューに入れる方がよいでしょうoutput
。キューに入れられた各オブジェクトはピクルス化され、多くの小さなリストよりも1つの大きなリストをピクルス化する方が高速です。
- 1つのプロセスからのみ印刷する方が安全だと思います。そのため、printステートメントは互いにステップしません(出力が壊れてしまいます)。