3

私は最近、マルチプロセッシングを使用してネットワーク パスを見つけるという質問を投稿しました。

test_workers()ただし、 (マルチプロセッシングを使用して)機能を実行するときに問題が発生しました。Nコードは実行されるが、ネットワーク内の多数のノードでハングするG

Mac OS X Lion 10.7.5 -- python 2.7 を使用して実行すると、N>500 のときにハングします。ロギングにより、次のメッセージが表示された後、ハングします

[DEBUG/MainProcess] doing self._thread.start()
[DEBUG/MainProcess] starting thread to feed data to pipe
[DEBUG/MainProcess] ... done self._thread.start()

Windows 7 で VMware Fusion を使用して実行すると、より大きなネットワークが容易になりますが、最終的には N > 20,000 ノード付近のグラフでハングします (理想的には、N = 500,000 までのネットワークでこれを使用したいと考えています)。ぶら下がっている窓側からのメッセージ:

[DEBUG/MainProcess] starting thread to feed data to pipe
[DEBUG/MainProcess] ... done self._thread.start()[DEBUG/MainProcess] telling queue thread to quit
Traceback (most recent call last):
      File "C:\Users\Scott\Desktop\fp_test.py", line 75, in <module>
    Traceback (most recent call last):
          File "C:\Python27\lib\multiprocessing\queues.py", line 264, in _feed
    test_workers()
    MemoryError

なぜこれが起こっているのかについて誰かが何か考えを持っているのだろうかと思いましたか? 大規模なネットワークでこれを機能させる方法について何か提案があれば教えてください。

あなたが持っているかもしれない提案を前もって感謝します。

@unutbu のコード:

import networkx as nx
import multiprocessing as mp
import random
import sys
import itertools as IT
import logging
logger = mp.log_to_stderr(logging.DEBUG)


def worker(inqueue, output):
    result = []
    count = 0
    for pair in iter(inqueue.get, sentinel):
        source, target = pair
        for path in nx.all_simple_paths(G, source = source, target = target,
                                        cutoff = None):
            result.append(path)
            count += 1
            if count % 10 == 0:
                logger.info('{c}'.format(c = count))
    output.put(result)

def test_workers():
    result = []
    inqueue = mp.Queue()
    for source, target in IT.product(sources, targets):
        inqueue.put((source, target))
    procs = [mp.Process(target = worker, args = (inqueue, output))
             for i in range(mp.cpu_count())]
    for proc in procs:
        proc.daemon = True
        proc.start()
    for proc in procs:    
        inqueue.put(sentinel)
    for proc in procs:
        result.extend(output.get())
    for proc in procs:
        proc.join()
    return result

def test_single_worker():
    result = []
    count = 0
    for source, target in IT.product(sources, targets):
        for path in nx.all_simple_paths(G, source = source, target = target,
                                        cutoff = None):
            result.append(path)
            count += 1
            if count % 10 == 0:
                logger.info('{c}'.format(c = count))

    return result

sentinel = None

seed = 1
m = 1
N = 1340//m
G = nx.gnm_random_graph(N, int(1.7*N), seed)
random.seed(seed)
sources = [random.randrange(N) for i in range(340//m)]
targets = [random.randrange(N) for i in range(1000//m)]
output = mp.Queue()

if __name__ == '__main__':
    test_workers()
    # test_single_worker()
    # assert set(map(tuple, test_workers())) == set(map(tuple, test_single_worker()))
4

1 に答える 1

2

loggingモジュールでデッドロックが発生しました。

このモジュールは、スレッド間で安全にログを記録できるようにいくつかのスレッド ロックを保持しますが、現在のプロセスが fork されている場合はうまく機能しません。何が起こっているかの説明については、たとえばここを参照してください。

解決策は、呼び出しを削除するか、代わりにloggingプレーン s を使用することです。print

とにかく、原則として、スレッド + フォークの使用は避けてください。また、どのモジュールが裏でスレッドを使用しているかを常に確認してください。

Windows では単純に機能することに注意してください。これは、Windows にforkはロック クローンがなく、その後のデッドロックによるロック クローンの問題がないためです。その場合、MemoryErrorプロセスが RAM を消費しすぎていることを示します。RAM の使用量を減らすには、おそらくアルゴリズムを再考する必要がありますが、OSX で発生している問題とはまったく異なります。

于 2012-12-30T15:23:24.487 に答える