4

提供されたディレクトリパターンを使用してファイルシステムを精査するPython関数を作成しました。オプションの「アクション」は各レベルで提供されます。一部のボリュームはネットワーク共有上にあり、IO のブロックを最小限に抑えたいため、マルチスレッド化を試みました。これが最も便利だったので、マルチプロセッシング Pool クラスを使用することから始めました...(真剣に、スレッド化のための Pool クラスはありませんか?) 私の関数は、提供された FS パターンを可能な限り解明し、新しいパスがなくなるまで、新しく返されたパスをプールに送信します返されます。関数とクラスを直接使用するとうまく機能するようになりましたが、別のクラスからこの関数を使用しようとすると、プログラムがハングするようです。簡単にするために、プロセスの代わりにスレッドを使用して関数を書き直し、さらに単純な ThreadPool クラスを作成しました...同じ問題。ここ'

file test1.py:
------------------------------------------------

import os
import glob
from multiprocessing import Pool

def mapGlob(pool,paths,pattern):
    results = []
    paths = [os.path.join(p,pattern) for p in paths]
    for result in pool.map(glob.glob,paths):
        results += result
    return results

def findAllMyPaths():
    pool = Pool(10)
    paths = ['/Volumes']
    follow = ['**','ptid_*','expid_*','slkid_*']
    for pattern in follow:
        paths = mapGlob(pool,paths,pattern)
    return paths


file test2.py:
----------------------------------------------------------------------------

from test1 import findAllMyPaths

allmypaths = findAllMyPaths()

今電話したら

>>>from test1 import findAllMyPaths
>>>findAllMyPaths()
>>>...long list of all the paths

これはうまくいきますが、試してみると:

>>>from test2 import allmypaths

python は永久にハングアップします。アクション関数が呼び出されます (この例ではグロブ) が、返されないようです... 助けが必要です... 並列化されたバージョンは、適切に動作するとはるかに高速に実行されます (「アクション」が何であるかに応じて、6 倍から 20 倍高速になります) FS ツリーの各ポイントにマッピングされている) ので、使用できるようにしたいと思います。

また、マッピング関数を非並列バージョンに変更した場合:

def mapGlob(pool,paths,pattern):
    results = []
    paths = [os.path.join(p,pattern) for p in paths]
    for path in paths:
        results += glob.glob(path)
    return results

すべてがうまくいきます。

編集:

マルチプロセッシングでのデバッグをオンにして、それがさらに役立つかどうかを確認しました。それが機能する場合、次のようになります。

[DEBUG/MainProcess] created semlock with handle 5
[DEBUG/MainProcess] created semlock with handle 6
[DEBUG/MainProcess] created semlock with handle 9
[DEBUG/MainProcess] created semlock with handle 10
[INFO/PoolWorker-1] child process calling self.run()
[INFO/PoolWorker-2] child process calling self.run()
[INFO/PoolWorker-3] child process calling self.run()
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-4] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[INFO/PoolWorker-7] child process calling self.run()
[INFO/PoolWorker-9] child process calling self.run()
[INFO/PoolWorker-8] child process calling self.run()
[INFO/PoolWorker-10] child process calling self.run()
[DEBUG/MainProcess] closing pool
[SUBDEBUG/MainProcess] finalizer calling <bound method type._terminate_pool of <class 'multiprocessing.pool.Pool'>> with args (<Queue.Queue instance at 0x34af918>, <multiprocessing.queues.SimpleQueue object at 0x3494950>, <multiprocessing.queues.SimpleQueue object at 0x34a61b0>, [<Process(PoolWorker-1, started daemon)>, <Process(PoolWorker-2, started daemon)>, <Process(PoolWorker-3, started daemon)>, <Process(PoolWorker-4, started daemon)>, <Process(PoolWorker-5, started daemon)>, <Process(PoolWorker-6, started daemon)>, <Process(PoolWorker-7, started daemon)>, <Process(PoolWorker-8, started daemon)>, <Process(PoolWorker-9, started daemon)>, <Process(PoolWorker-10, started daemon)>], <Thread(Thread-1, started daemon -1341648896)>, <Thread(Thread-2, started daemon -1341116416)>, {}) and kwargs {}
[DEBUG/MainProcess] finalizing pool
[DEBUG/MainProcess] helping task handler/workers to finish
[DEBUG/MainProcess] removing tasks from inqueue until task handler finished
[DEBUG/MainProcess] task handler got sentinel
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] task handler exiting
[DEBUG/MainProcess] result handler got sentinel
[DEBUG/MainProcess] ensuring that outqueue is not full
[DEBUG/MainProcess] result handler exiting: len(cache)=0, thread._state=0
[DEBUG/PoolWorker-2] worker got sentinel -- exiting
[DEBUG/PoolWorker-1] worker got sentinel -- exiting
[INFO/PoolWorker-2] process shutting down
[DEBUG/PoolWorker-7] worker got sentinel -- exiting
[INFO/PoolWorker-1] process shutting down
[INFO/PoolWorker-7] process shutting down
[DEBUG/PoolWorker-7] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-1] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-7] running the remaining "atexit" finalizers
[INFO/PoolWorker-7] process exiting with exitcode 0
[DEBUG/PoolWorker-1] running the remaining "atexit" finalizers
[INFO/PoolWorker-1] process exiting with exitcode 0
[DEBUG/PoolWorker-5] worker got sentinel -- exiting
[DEBUG/PoolWorker-2] running all "atexit" finalizers with priority >= 0
[INFO/PoolWorker-5] process shutting down
[DEBUG/PoolWorker-5] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-2] running the remaining "atexit" finalizers
[DEBUG/PoolWorker-5] running the remaining "atexit" finalizers
[INFO/PoolWorker-2] process exiting with exitcode 0
[INFO/PoolWorker-5] process exiting with exitcode 0
[DEBUG/PoolWorker-6] worker got sentinel -- exiting
[INFO/PoolWorker-6] process shutting down
[DEBUG/PoolWorker-6] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-6] running the remaining "atexit" finalizers
[INFO/PoolWorker-6] process exiting with exitcode 0
[DEBUG/PoolWorker-4] worker got sentinel -- exiting
[DEBUG/PoolWorker-9] worker got sentinel -- exiting
[INFO/PoolWorker-9] process shutting down
[DEBUG/PoolWorker-9] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-9] running the remaining "atexit" finalizers
[INFO/PoolWorker-9] process exiting with exitcode 0
[INFO/PoolWorker-4] process shutting down
[DEBUG/PoolWorker-4] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-4] running the remaining "atexit" finalizers
[INFO/PoolWorker-4] process exiting with exitcode 0
[DEBUG/PoolWorker-10] worker got sentinel -- exiting
[INFO/PoolWorker-10] process shutting down
[DEBUG/PoolWorker-10] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-10] running the remaining "atexit" finalizers
[INFO/PoolWorker-10] process exiting with exitcode 0
[DEBUG/PoolWorker-8] worker got sentinel -- exiting
[INFO/PoolWorker-8] process shutting down
[DEBUG/PoolWorker-8] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-8] running the remaining "atexit" finalizers
[INFO/PoolWorker-8] process exiting with exitcode 0
[DEBUG/PoolWorker-3] worker got sentinel -- exiting
[INFO/PoolWorker-3] process shutting down
[DEBUG/PoolWorker-3] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-3] running the remaining "atexit" finalizers
[INFO/PoolWorker-3] process exiting with exitcode 0
[DEBUG/MainProcess] terminating workers
[DEBUG/MainProcess] joining task handler
[DEBUG/MainProcess] joining result handler
[DEBUG/MainProcess] joining pool workers

そして、それがすべてではない場合は、次のようになります。

[DEBUG/MainProcess] created semlock with handle 6
[DEBUG/MainProcess] created semlock with handle 7
[DEBUG/MainProcess] created semlock with handle 10
[DEBUG/MainProcess] created semlock with handle 11
[INFO/PoolWorker-1] child process calling self.run()
[INFO/PoolWorker-2] child process calling self.run()
[INFO/PoolWorker-3] child process calling self.run()
[INFO/PoolWorker-8] child process calling self.run()
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-4] child process calling self.run()
[INFO/PoolWorker-9] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[INFO/PoolWorker-7] child process calling self.run()
[INFO/PoolWorker-10] child process calling self.run()
4

2 に答える 2

1

完全な解決策ではありませんが、インタープリターから、または実行中のスクリプトのコードとして、コードを機能させる方法を見つけました。問題は、マルチプロセッシング ドキュメントの次のメモに関係していると思います。

このパッケージ内の機能では、mainメソッドが子によってインポート可能である必要があります。これはプログラミング ガイドラインで説明されていますが、ここで指摘する価値があります。これは、 multiprocessing.Pool の例など、一部の例が対話型インタープリターでは機能しないことを意味します。

なぜこの制限が存在するのか、なぜインタラクティブインタープリターからプールを使用できる場合と使用できない場合があるのか​​ はわかりませんが、まあ....

それを回避するために、マルチプロセッシングを使用する可能性のあるモジュールで次のことを行います。

import __main__
__SHOULD_MULTITHREAD__ = False
if hasattr(__main__,'__file__'):
    __SHOULD_MULTITHREAD__ = True

そのモジュール内の残りのコードは、このフラグをチェックして、プールを使用するか、並列化せずに実行するかを確認できます。これを行うと、対話型インタープリターからモジュールで並列化された関数を引き続き使用およびテストできますが、実行速度が大幅に低下します。

于 2011-01-27T20:39:56.653 に答える
0

私が間違っていなければ、test2.pyはこのように見えるべきではありません

from test1 import findAllMyPaths
allmypaths = findAllMyPaths

その後

from test2 import allmypaths  
allmypaths()
于 2011-01-25T22:18:00.770 に答える