0

このコードの例を見てください:

def get_hash(path, hash_type='md5'):
    func = getattr(hashlib, hash_type)()
    f = os.open(path, (os.O_RDWR | os.O_BINARY))
    for block in iter(lambda: os.read(f, 1024*func.block_size), b''):
        func.update(block)
    os.close(f)
    return func.hexdigest()

この関数は、任意のファイルの md5sum を返します。30 個を超えるファイルを含むディレクトリがあり、各ファイルに対してハッシュ関数を実行したいとします。

def hasher(path=some_path):
    for root, dirs, files in os.walk(path, topdown=False):
        for name in files:
            path = os.path.join(root, name)
            yield get_hash(path)
@some_timer_decorator
... some testing function here ...

test1 took 4.684999942779541 seconds.

hasherご覧のとおり、目前の状況により、関数を「活用」してマルチプロセッシングを追加する機会が得られます。

def hasher_parallel(path=PATH):
    p = multiprocessing.Pool(3)
    for root, dirs, files in os.walk(path, topdown=False):
        for name in files:
            full_name = os.path.join(root, name)
            yield p.apply_async(get_hash, (full_name,)).get()
@some_timer_decorator
... some other testing function here ...

test2 took 4.781000137329102 seconds.

出力は同じです。ほとんどのファイルは 20MB 未満であり、hasher関数はこれらの合計を非常に高速に計算するため (一般に、そのサイズのファイルの場合)、並列バージョンの方がはるかに高速であると予想していました。私の実装に何か問題がありますか?何も問題がなければ、同じ問題へのより速いアプローチはありますか?

#

これは、実行時間を測定するために使用したデコレーター関数です。

def hasher_time(f):
        def f_timer(*args, **kwargs):
            start = time.time()
            result = f(*args, **kwargs)
            end = time.time()
            print(f.__name__, 'took', end - start, 'seconds')
            return result
        return f_timer
#
4

1 に答える 1

3

ジョブをプッシュして、完了するのを待ちます:

yield p.apply_async(get_hash, (full_name,)).get()

AsyncResult.get()ジョブが完了するまでメソッドブロックされ、ジョブを効果的に順番に実行しています。

AsyncResult.ready()ジョブを収集し、完了するまでポーリングしてから、結果を取得 .get()します。

.async_apply()さらに良いのは、呼び出しですべてのジョブをプールにプッシュしてから、プールを閉じて呼び出し.join()(すべてのジョブが完了するまでブロックします)、次のコマンドで結果を取得すること.get()です。

def hasher_parallel(path=PATH):
    p = multiprocessing.Pool(3)
    jobs = []
    for root, dirs, files in os.walk(path, topdown=False):
        for name in files:
            full_name = os.path.join(root, name)
            jobs.append(p.apply_async(get_hash, (full_name,)))
    p.close()
    p.join()  # wait for jobs to complete
    for job in jobs:
        yield job.get()

Pool.imap()このメソッドを使用して、コードをいくらか単純化できます。利用可能になると結果が得られます。

def hasher_parallel(path=PATH):
    p = multiprocessing.Pool(3)
    filenames = (os.path.join(root, name)
        for root, dirs, files in os.walk(path, topdown=False)
        for name in files)
    for result in p.imap(get_hash, filenames):
        yield result

ただし、chunksizeパラメータと順序付けされていないバリアントも試してください。

于 2014-03-30T00:53:40.750 に答える