0

同期がキューからジョブを取得することだけである限り、任意の数のコアを 100% の使用率にする一連の CPU バウンド プロセスがあります。

ファイル システムのディレクトリを更新するときに最悪のシナリオを回避するために RLock を追加するとすぐに、プロセスが IO バウンドになったかのように、CPU/コアの使用率が 60% に低下します。

説明は何ですか?

これは全体的な速度に関するものではありません。これは CPU/コアの使用率に関するものなので、Python 2/3、Cython、または PyPy は関係ありません。

更新:自分の質問に部分的な回答をしました。私の特定のケースの最終的な解決策は、ファイルシステムへのアクセス方法を変更して、同期が不要になるようにすることでした(「一種の」マップ/リデュース)。

4

2 に答える 2

1

それはすべて、どのようmultiprocessingに実装されたかによって異なりますRLock。マルチプロセッシングはホスト間で機能する可能性があることを認識してます。これは、同期プリミティブがソケット間で機能する可能性があることを意味します。それが本当なら、それは多くの(可変の)レイテンシーを導入するでしょう。

それで私は実験をしました。

RLockこれは、複数のプロセスで使用されているうっとうしい例です(すべてのロックが同じプロセス内にある高速パスを防ぐため)。

#!/usr/bin/env python
import multiprocessing
from time import sleep

lock = multiprocessing.RLock()

def noop(myname):
    # nonlocal lock
    sleep(0.5)
    print myname, "acquiring lock"
    with lock:
        print myname, "has lock"
        sleep(0.5)
    print myname, "released lock"

sProc1 = multiprocessing.Process(target=noop, args=('alice',))
sProc2 = multiprocessing.Process(target=noop, args=('bob',))

sProc1.start()
sProc2.start()

sProc1.join()
sProc2.join()

これを実行すると、出力は次のようになります。

alice acquiring lock
alice has lock
bob acquiring lock
alice released lock
bob has lock
bob released lock

すばらしいので、 straceを介したシステムコールトレースで実行します。

以下のコマンドでは、-ffオプションはツールにfork()呼び出しを「追跡」するように指示します。つまり、メインのプロセスによって開始されたプロセスをトレースします。簡潔にするために、これも使用して-e trace=futex,writeいます。これは、これを投稿する前に行った結論に基づいて出力をフィルタリングします。通常、オプションなしで実行し-e、テキストエディタ/を使用してgrep事後に何が起こったかを調べます。

# strace -ff -e trace=futex,write ./traceme.py
futex(0x7fffeafe29bc, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 1, NULL, 7fb92ac6c700) = -1 EAGAIN (Resource temporarily unavailable)
futex(0x7fb92a8540b0, FUTEX_WAKE_PRIVATE, 2147483647) = 0
futex(0x7fb92aa7131c, FUTEX_WAKE_PRIVATE, 2147483647) = 0
write(3, "\1\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 32) = 32
Process 25873 attached
Process 25874 attached
Process 25872 suspended
[pid 25873] write(1, "alice acquiring lock\n", 21alice acquiring lock
) = 21
[pid 25873] write(1, "alice has lock\n", 15alice has lock
) = 15
[pid 25874] write(1, "bob acquiring lock\n", 19bob acquiring lock
) = 19
[pid 25874] futex(0x7fb92ac91000, FUTEX_WAIT, 0, NULL <unfinished ...>
[pid 25873] futex(0x7fb92ac91000, FUTEX_WAKE, 1 <unfinished ...>
[pid 25874] <... futex resumed> )       = 0
[pid 25873] <... futex resumed> )       = 1
[pid 25874] write(1, "bob has lock\n", 13 <unfinished ...>
bob has lock
[pid 25873] write(1, "alice released lock\n", 20 <unfinished ...>
alice released lock
[pid 25874] <... write resumed> )       = 13
[pid 25873] <... write resumed> )       = 20
Process 25872 resumed
Process 25873 detached
[pid 25872] --- SIGCHLD (Child exited) @ 0 (0) ---
Process 25872 suspended
[pid 25874] write(1, "bob released lock\n", 18bob released lock
) = 18
Process 25872 resumed
Process 25874 detached
--- SIGCHLD (Child exited) @ 0 (0) ---

ブロックして後で再開するprint(write())メッセージと呼び出しのパターンから、、または「FastUserspaceMutex」を使用して実装されていることは明らかです。名前が示すように、これは同期に適した選択です。futexRLockfutex

プロセスがシステムコールでブロックされている場合、プロセスがfutexすべての目的と目的でI/Oをブロックしているように呼び出します。

これはすべて、それmultiprocessing.RLockが効率的であり、設計されたとおりに実行されていることを意味します。したがって、同期を使用したときにアプリケーションのパフォーマンスが予想よりも低い場合は、アルゴリズムが原因である可能性があります。

于 2013-03-09T15:32:53.583 に答える
0

測定値は、RLockI/O バウンドではないことを示しています。RLock(または) と同期するコードは、同期しSemaphoreないコードよりも約 6 倍遅いように見えますが、最も単純な I/O を実行するコードは数桁遅くなります。

RLock以下は、 Python と PyPy の両方でオーバーヘッドを測定するために使用した単一プロセス コードです。1 つのプロセスで のオーバーヘッドが非常に高い理由、またはオーバーヘッドが CPU/コアの使用率を維持しない理由はまだわかりRLockませんが、結果は、標準の同期プリミティブを使用する方が独自のプロセスを実行しようとするよりもはるかに効率的であることを示しています。 .

# lock.py
import sys
import os
import timeit
from random import random
from multiprocessing import RLock, Semaphore

N=8*1024*1024

lock = RLock()
semaphore = Semaphore()

def touch(fname, times=None):
    if not os.path.isfile(fname):
        open(fname, 'wa').close()
    else:
        with file(fname, 'a'):
            os.utime(fname, times)

def wolock():
    return random()

def wlock():
    with lock:
        return random()

def wsem():
    with semaphore:
        return random()

def wfile():
   os.path.isfile('lock')
   touch('lock') 
   try:
        return random()
   finally:
        os.unlink('lock')

def run(get):
    result = []
    for i in xrange(N):
        result.append(get())
    return result

def main():
    t0 = timeit.timeit('lock.wolock()', setup='import lock', number=N)
    print '%8.3f %8.2f%% %s' % (t0, 100, 'unsynchronized')
    t1 = timeit.timeit('lock.wlock()', setup='import lock', number=N)
    print '%8.3f %8.2f%% %s' % (t1, 100*t1/t0, 'rlock')
    t2 = timeit.timeit('lock.wsem()', setup='import lock', number=N)
    print '%8.3f %8.2f%% %s' % (t2, 100*t2/t0, 'semaphore')
    t = timeit.timeit('lock.wfile()', setup='import lock', number=N)
    print '%8.3f %s' % (t, 'file system')

if __name__ == '__main__':
    main()
于 2013-03-09T13:47:54.197 に答える