1

によって実行されるタスクがあるとしますmultiprocessing.Pool。このタスクが新しいタスクを実行に追加できるようにするにはどうすればよいPoolですか? 例えば、

def integers(pool, queue, n1, n2):
  print ("integers(%d)" % n)
  queue.put(n)
  pool.apply_async(integers, (pool, queue, n+1))  # crashes; can't pickle `pool`

def start():
  pool  = multiprocessing.Pool()
  queue = multiprocessing.Queue()
  integers(pool, queue, 1)
  while True:
    yield queue.get()
4

1 に答える 1

2

をピクルすることはできないPoolため、ワーカーがタスクを追加できるようにする場合は、回避策を見つける必要があります。

メインプログラムに新しいタスクを追加するように指示する特定の「センチネル」戻り値を使用できますPool

while True:
    ret_value = queue.get()
    if is_sentinel(ret_value):
        pool.apply_asynch(*get_arguments(ret_value))
    yield ret_value

Whereは、戻り値が にさらにジョブを追加する必要がある場合はいつでも戻りますis_sentinel。は、 に渡される引数を取得できる関数です。TruePoolget_argumentsPool

このような関数の最も単純な実装は次のようになります。

def is_sentinel(value):
    """Assume only sentinel values are tuples, or sequences."""
    return isinstance(value, tuple)
    # or, using duck-typing
    try:
        len(value)
    except TypeError:
        return False
    else:
        return True


def get_arguments(value):
    """Assume that the sentinel value *is* the tuple of arguments
    to pass to the Pool.
    """
    return value
    # or return value[1:] if you want to include a "normal" return value

渡された関数は、新しいタスクを追加する場合にのみapply_asyncha tuple(またはシーケンス)を返します。この場合、戻り値は提供されません。戻り値も提供する可能性を追加するのは非常に簡単です (たとえば、タプルの最初の要素は「通常の」戻り値になる可能性があります)。

別のアプローチとして、ワーカーがリクエストを送信できる 2 番目のキューを使用することもできます。各反復で、このメソッドを使用してget_nowait()、ワーカーがキューにさらにジョブを追加するように要求したかどうかを確認できます。


最初のアプローチを使用した例:

def is_sentinel(value):
    return isinstance(value, tuple)

def get_arguments(value):
    return value


def integers(queue, n1, n2):
    print("integers(%d)" % n1)
    queue.put(n1)
    if n1 < n2:
        queue.put((integers, (queue, n1+1, n2)))

def start():
    pool = multiprocessing.Pool()
    queue = multiprocessing.Queue()
    m = 0
    n = 100
    pool.apply_asynch(integers, (queue, m, n))
    while True:
        ret_val = queue.get()
        if is_sentinel(ret_val):
            pool.apply_asynch(*get_arguments(ret_val))
        else:
            yield ret_val

2番目のアプローチを使用した例:

# needed for queue.Empty exception
import queue

def integers(out_queue, task_queue, n1, n2):
    print("integers(%d)" % n1)
    out_queue.put(n1)
    if n1 < n2:
        task_queue.put((n1+1, n2))


def start():
    pool = multiprocessing.Pool()
    out_queue = multiprocessing.Queue()
    task_queue = multiprocessing.Queue()
    task_queue.put((0, 100))
    while True:
        try:
            # may be safer to use a timeout...
            m, n = task_queue.get_nowait()
            pool.apply_asynch(integers, (out_queue, task_queue, m, n))
        except queue.Empty:
            # no task to perform
            pass
        yield out_queue.get()
于 2013-09-08T07:40:22.490 に答える