をピクルすることはできないPool
ため、ワーカーがタスクを追加できるようにする場合は、回避策を見つける必要があります。
メインプログラムに新しいタスクを追加するように指示する特定の「センチネル」戻り値を使用できますPool
。
while True:
ret_value = queue.get()
if is_sentinel(ret_value):
pool.apply_asynch(*get_arguments(ret_value))
yield ret_value
Whereは、戻り値が にさらにジョブを追加する必要がある場合はいつでも戻りますis_sentinel
。は、 に渡される引数を取得できる関数です。True
Pool
get_arguments
Pool
このような関数の最も単純な実装は次のようになります。
def is_sentinel(value):
"""Assume only sentinel values are tuples, or sequences."""
return isinstance(value, tuple)
# or, using duck-typing
try:
len(value)
except TypeError:
return False
else:
return True
def get_arguments(value):
"""Assume that the sentinel value *is* the tuple of arguments
to pass to the Pool.
"""
return value
# or return value[1:] if you want to include a "normal" return value
渡された関数は、新しいタスクを追加する場合にのみapply_asynch
a tuple
(またはシーケンス)を返します。この場合、戻り値は提供されません。戻り値も提供する可能性を追加するのは非常に簡単です (たとえば、タプルの最初の要素は「通常の」戻り値になる可能性があります)。
別のアプローチとして、ワーカーがリクエストを送信できる 2 番目のキューを使用することもできます。各反復で、このメソッドを使用してget_nowait()
、ワーカーがキューにさらにジョブを追加するように要求したかどうかを確認できます。
最初のアプローチを使用した例:
def is_sentinel(value):
return isinstance(value, tuple)
def get_arguments(value):
return value
def integers(queue, n1, n2):
print("integers(%d)" % n1)
queue.put(n1)
if n1 < n2:
queue.put((integers, (queue, n1+1, n2)))
def start():
pool = multiprocessing.Pool()
queue = multiprocessing.Queue()
m = 0
n = 100
pool.apply_asynch(integers, (queue, m, n))
while True:
ret_val = queue.get()
if is_sentinel(ret_val):
pool.apply_asynch(*get_arguments(ret_val))
else:
yield ret_val
2番目のアプローチを使用した例:
# needed for queue.Empty exception
import queue
def integers(out_queue, task_queue, n1, n2):
print("integers(%d)" % n1)
out_queue.put(n1)
if n1 < n2:
task_queue.put((n1+1, n2))
def start():
pool = multiprocessing.Pool()
out_queue = multiprocessing.Queue()
task_queue = multiprocessing.Queue()
task_queue.put((0, 100))
while True:
try:
# may be safer to use a timeout...
m, n = task_queue.get_nowait()
pool.apply_asynch(integers, (out_queue, task_queue, m, n))
except queue.Empty:
# no task to perform
pass
yield out_queue.get()