2

私は以下のコードを実行しており、正常に動作していますが、別のプロセスに生成されていません. そして、私は4つのCPUマシンを使用しています。このコードの何が問題になっていますか?

def f(values):
    print(multiprocessing.current_process())
    return values

def main():
    p = Pool(4) #number of processes = number of CPUs
    keys, values= zip(*data.items()) #ordered keys and values
    processed_values= p.map( f, values )
    result= dict( zip(keys, processed_values ) ) 
    p.close() # no more tasks
    p.join()  # wrap up current tasks

そして結果は

<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>

また、時にはこんな風に、

<SpawnProcess(SpawnPoolWorker-3, started daemon)>
<SpawnProcess(SpawnPoolWorker-2, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-3, started daemon)>

時々、

<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-4, started daemon)>
<SpawnProcess(SpawnPoolWorker-2, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>

そして私の質問は、どのような基準で機能をワーカーに割り当てるのですか? ディクショナリ内のキーの数に基づいてプロセスの数を決定する方法でコードを記述しています (データのキーは常に CPU よりも少ないことを考慮して)。私のコードは次のように始まります-メインコードはファイルを読み取り、単一のプロセスを使用して辞書を作成し、それを複数の同時プロセスに分岐し、それらがデータを処理するのを待つ必要があります(私はそのためにpool.mapを使用しています)、次に子プロセスの結果を取得すると、それらの処理を開始します。この親が子プロセスステップを待機するようにするにはどうすればよいですか?

4

1 に答える 1

5

コードに問題はありません。あなたの作業項目は非常に高速です。同じワーカー プロセスが関数を実行し、結果を返し、競合に勝って、作業multiprocessing.Poolを分散するために使用する内部キューから次のタスクを消費する可能性があります。を呼び出すmapと、作業項目がバッチに分割され、Queue. pool.mapiterable のアイテムをチャンクアップしてキューに入れる実装の一部を次に示します。

    task_batches = Pool._get_tasks(func, iterable, chunksize)
    result = MapResult(self._cache, chunksize, len(iterable), callback)
    self._taskqueue.put((((result._job, i, mapstar, (x,), {}) 
                          for i, x in enumerate(task_batches)), None))

各ワーカー プロセスは、そのキューから項目を消費する無限 while ループを持つ関数を実行します*:

while maxtasks is None or (maxtasks and completed < maxtasks):
    try:
        task = get()  # Pulls an item off the taskqueue
    except (EOFError, IOError):
        debug('worker got EOFError or IOError -- exiting')
        break

    if task is None:
        debug('worker got sentinel -- exiting')
        break

    job, i, func, args, kwds = task
    try:
        result = (True, func(*args, **kwds))  # Runs the function you passed to map
    except Exception, e:
        result = (False, e)
    try:
        put((job, i, result))  # Sends the result back to the parent
    except Exception as e:
        wrapped = MaybeEncodingError(e, result[1])
        debug("Possible encoding error while sending result: %s" % (
            wrapped))

同じワーカーがたまたまアイテムを消費し、 を実行funcして、次のアイテムを消費できた可能性があります。これやや奇妙です-あなたの例と同じコードを実行しているマシンで再現することはできません-しかし、同じワーカーがキューから4つのアイテムのうち2つを取得するのはごく普通のことです。

への呼び出しを挿入してワーカー関数の実行時間を長くすると、常に均一な分布が見られるはずですtime.sleep

def f(values):
    print(multiprocessing.current_process())
    time.sleep(1)
    return values

* これは実際にはまったく正しくありません - から消費するメイン プロセスで実行されるスレッドがあり、taskqueueそれが引き抜いたものを別の に貼り付けますQueue。それが子プロセスが消費するものです)

于 2014-09-18T00:30:29.163 に答える