マルチプロセッシングは python の強力なツールであり、もっと深く理解したいと思っています。通常の ロックとキューを使用するタイミングと、マルチプロセッシングマネージャーを使用してこれらをすべてのプロセス間で共有するタイミングを知りたいです。
マルチプロセッシングの 4 つの異なる条件を使用して、次のテスト シナリオを考え出しました。
プールとNO Managerの使用
プールとマネージャーの使用
個々のプロセスとNO Managerの使用
個々のプロセスとマネージャーの使用
仕事
すべての条件がジョブ機能を実行しますthe_job
。the_job
ロックによって保護されたいくつかの印刷で構成されています。さらに、関数への入力は単純にキューに入れられます (キューから復元できるかどうかを確認するため)。この入力は、(一番下に表示されている)と呼ばれるメイン スクリプトで作成された単純なインデックスidx
です。range(10)
start_scenario
def the_job(args):
"""The job for multiprocessing.
Prints some stuff secured by a lock and
finally puts the input into a queue.
"""
idx = args[0]
lock = args[1]
queue=args[2]
lock.acquire()
print 'I'
print 'was '
print 'here '
print '!!!!'
print '1111'
print 'einhundertelfzigelf\n'
who= ' By run %d \n' % idx
print who
lock.release()
queue.put(idx)
条件の成功は、キューからの入力を完全にリコールすることとして定義されますread_queue
。下部の関数を参照してください。
状況、契約条項
条件 1 と 2 は、かなり自明です。条件 1 では、ロックとキューを作成し、これらをプロセス プールに渡します。
def scenario_1_pool_no_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITHOUT a Manager for the lock and queue.
FAILS!
"""
mypool = mp.Pool(ncores)
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
mypool.imap(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
(ヘルパー関数make_iterator
は、この投稿の下部に記載されています。) 条件 1 は で失敗しRuntimeError: Lock objects should only be shared between processes through inheritance
ます。
条件 2 はかなり似ていますが、ロックとキューはマネージャの監視下にあります。
def scenario_2_pool_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITH a Manager for the lock and queue.
SUCCESSFUL!
"""
mypool = mp.Pool(ncores)
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
mypool.imap(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
条件 3 では、新しいプロセスが手動で開始され、マネージャーなしでロックとキューが作成されます。
def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITHOUT a Manager,
SUCCESSFUL!
"""
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
条件 4 は似ていますが、ここでもマネージャーを使用しています。
def scenario_4_single_processes_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITH a Manager,
SUCCESSFUL!
"""
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
3 と 4 の両方の条件で、10 個のタスクのそれぞれに対して新しいプロセスを開始the_job
し、最大でncoresプロセスが同時に動作します。これは、次のヘルパー関数で実現されます。
def do_job_single_processes(jobfunc, iterator, ncores):
"""Runs a job function by starting individual processes for every task.
At most `ncores` processes operate at the same time
:param jobfunc: Job to do
:param iterator:
Iterator over different parameter settings,
contains a lock and a queue
:param ncores:
Number of processes operating at the same time
"""
keep_running=True
process_dict = {} # Dict containing all subprocees
while len(process_dict)>0 or keep_running:
terminated_procs_pids = []
# First check if some processes did finish their job
for pid, proc in process_dict.iteritems():
# Remember the terminated processes
if not proc.is_alive():
terminated_procs_pids.append(pid)
# And delete these from the process dict
for terminated_proc in terminated_procs_pids:
process_dict.pop(terminated_proc)
# If we have less active processes than ncores and there is still
# a job to do, add another process
if len(process_dict) < ncores and keep_running:
try:
task = iterator.next()
proc = mp.Process(target=jobfunc,
args=(task,))
proc.start()
process_dict[proc.pid]=proc
except StopIteration:
# All tasks have been started
keep_running=False
time.sleep(0.1)
結果
条件 1 のみが失敗し ( RuntimeError: Lock objects should only be shared between processes through inheritance
)、他の 3 つの条件は成功します。私はこの結果に頭を悩ませようとします。
プールはすべてのプロセス間でロックとキューを共有する必要があるのに、条件 3 の個々のプロセスは共有しないのはなぜですか?
私が知っているのは、プール条件 (1 と 2) では、イテレーターからのすべてのデータが酸洗を介して渡されるのに対し、単一プロセス条件 (3 と 4) では、イテレーターからのすべてのデータがメイン プロセスからの継承によって渡されることです (私はLinuxを使用)。子プロセス内からメモリが変更されるまで、親プロセスが使用するのと同じメモリがアクセスされると思います(コピーオンライト)。しかしlock.acquire()
、これは変更する必要があり、子プロセスはメモリ内の別の場所に配置された別のロックを使用しますよね? 1 つの子プロセスは、マネージャーを介して共有されていない兄弟がロックをアクティブにしたことをどのように知るのでしょうか?
最後に、条件 3 と 4 がどれだけ異なるかという私の質問に多少関連があります。どちらも個別のプロセスを持っていますが、マネージャーの使用法が異なります。どちらも有効なコードと見なされますか? それとも、実際にマネージャーが必要ない場合は、マネージャーの使用を避けるべきですか?
完全なスクリプト
すべてをコピーして貼り付けてコードを実行したい人のために、完全なスクリプトを次に示します。
__author__ = 'Me and myself'
import multiprocessing as mp
import time
def the_job(args):
"""The job for multiprocessing.
Prints some stuff secured by a lock and
finally puts the input into a queue.
"""
idx = args[0]
lock = args[1]
queue=args[2]
lock.acquire()
print 'I'
print 'was '
print 'here '
print '!!!!'
print '1111'
print 'einhundertelfzigelf\n'
who= ' By run %d \n' % idx
print who
lock.release()
queue.put(idx)
def read_queue(queue):
"""Turns a qeue into a normal python list."""
results = []
while not queue.empty():
result = queue.get()
results.append(result)
return results
def make_iterator(args, lock, queue):
"""Makes an iterator over args and passes the lock an queue to each element."""
return ((arg, lock, queue) for arg in args)
def start_scenario(scenario_number = 1):
"""Starts one of four multiprocessing scenarios.
:param scenario_number: Index of scenario, 1 to 4
"""
args = range(10)
ncores = 3
if scenario_number==1:
result = scenario_1_pool_no_manager(the_job, args, ncores)
elif scenario_number==2:
result = scenario_2_pool_manager(the_job, args, ncores)
elif scenario_number==3:
result = scenario_3_single_processes_no_manager(the_job, args, ncores)
elif scenario_number==4:
result = scenario_4_single_processes_manager(the_job, args, ncores)
if result != args:
print 'Scenario %d fails: %s != %s' % (scenario_number, args, result)
else:
print 'Scenario %d successful!' % scenario_number
def scenario_1_pool_no_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITHOUT a Manager for the lock and queue.
FAILS!
"""
mypool = mp.Pool(ncores)
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
mypool.map(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
def scenario_2_pool_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITH a Manager for the lock and queue.
SUCCESSFUL!
"""
mypool = mp.Pool(ncores)
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
mypool.map(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITHOUT a Manager,
SUCCESSFUL!
"""
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
def scenario_4_single_processes_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITH a Manager,
SUCCESSFUL!
"""
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
def do_job_single_processes(jobfunc, iterator, ncores):
"""Runs a job function by starting individual processes for every task.
At most `ncores` processes operate at the same time
:param jobfunc: Job to do
:param iterator:
Iterator over different parameter settings,
contains a lock and a queue
:param ncores:
Number of processes operating at the same time
"""
keep_running=True
process_dict = {} # Dict containing all subprocees
while len(process_dict)>0 or keep_running:
terminated_procs_pids = []
# First check if some processes did finish their job
for pid, proc in process_dict.iteritems():
# Remember the terminated processes
if not proc.is_alive():
terminated_procs_pids.append(pid)
# And delete these from the process dict
for terminated_proc in terminated_procs_pids:
process_dict.pop(terminated_proc)
# If we have less active processes than ncores and there is still
# a job to do, add another process
if len(process_dict) < ncores and keep_running:
try:
task = iterator.next()
proc = mp.Process(target=jobfunc,
args=(task,))
proc.start()
process_dict[proc.pid]=proc
except StopIteration:
# All tasks have been started
keep_running=False
time.sleep(0.1)
def main():
"""Runs 1 out of 4 different multiprocessing scenarios"""
start_scenario(1)
if __name__ == '__main__':
main()