64

プール内の特定のワーカーによって実行されているジョブがどのワーカーが実行しているのかを知ることができるように、Pythonマルチプロセッシングプール内の各ワーカーに一意のIDを割り当てる方法はありますか?ドキュメントによるとProcessname

名前は、識別目的でのみ使用される文字列です。セマンティクスはありません。複数のプロセスに同じ名前を付けることができます。

私の特定のユースケースでは、4つのGPUのグループで一連のジョブを実行し、ジョブを実行するGPUのデバイス番号を設定する必要があります。ジョブの長さが不均一であるため、前のジョブが完了する前に、実行しようとしているジョブのGPUで衝突が発生しないようにします(これにより、IDをGPUに事前に割り当てることができなくなります)事前の作業単位)。

4

5 に答える 5

103

あなたが望むものは単純なようです:multiprocessing.current_process()。例えば:

import multiprocessing

def f(x):
    print multiprocessing.current_process()
    return x * x

p = multiprocessing.Pool()
print p.map(f, range(6))

出力:

$ python foo.py 
<Process(PoolWorker-1, started daemon)>
<Process(PoolWorker-2, started daemon)>
<Process(PoolWorker-3, started daemon)>
<Process(PoolWorker-1, started daemon)>
<Process(PoolWorker-2, started daemon)>
<Process(PoolWorker-4, started daemon)>
[0, 1, 4, 9, 16, 25]

これにより、プロセスオブジェクト自体が返されるため、プロセスを独自のIDにすることができます。一意の数値IDを要求することもできますid。cpythonでは、これはプロセスオブジェクトのメモリアドレスであるため、重複する可能性はないと思います。ident最後に、プロセスのまたはプロパティを使用できますが、これpidはプロセスが開始された後にのみ設定されます。

さらに、ソースを見ると、自動生成された名前(Process上記のrepr文字列の最初の値で例示されている)が一意である可能性が非常に高いようです。すべてのプロセスのオブジェクトをmultiprocessing維持します。これは、生成する子プロセスのタプルを生成するために使用されます。したがって、トップレベルのプロセスは単一値のIDを持つ子プロセスを生成し、2つの値のIDを持つプロセスを生成します。次に、コンストラクターに名前が渡されない場合は、。を使用して、_identityに基づいて名前を自動生成します。次に、を使用してプロセスの名前を変更し、自動生成されたIDを同じままにします。itertools.counter_identityProcess':'.join(...)Pool replace

これらすべての結果として、2つのProcesses同じ名前を持つ場合がありますが、作成時に同じ名前を割り当てることができるため、nameパラメーターに触れない場合は一意になります。_identityまた、理論的には一意の識別子として使用できます。しかし、私は彼らがその変数を理由でプライベートにしたことを収集します!

上記の動作例:

import multiprocessing

def f(x):
    created = multiprocessing.Process()
    current = multiprocessing.current_process()
    print 'running:', current.name, current._identity
    print 'created:', created.name, created._identity
    return x * x

p = multiprocessing.Pool()
print p.map(f, range(6))

出力:

$ python foo.py 
running: PoolWorker-1 (1,)
created: Process-1:1 (1, 1)
running: PoolWorker-2 (2,)
created: Process-2:1 (2, 1)
running: PoolWorker-3 (3,)
created: Process-3:1 (3, 1)
running: PoolWorker-1 (1,)
created: Process-1:2 (1, 2)
running: PoolWorker-2 (2,)
created: Process-2:2 (2, 2)
running: PoolWorker-4 (4,)
created: Process-4:1 (4, 1)
[0, 1, 4, 9, 16, 25]
于 2012-04-17T13:53:11.433 に答える
7

を使用multiprocessing.QueueしてIDを保存し、プールプロセスの初期化時にIDを取得できます。

利点:

  • 内部に依存する必要はありません。
  • ユースケースがリソース/デバイスの管理である場合は、デバイス番号を直接入力できます。これにより、デバイスが2回使用されないことも保証されます。プールにデバイスよりも多くのプロセスがある場合、追加のプロセスがブロックされqueue.get()、作業は実行されません(これにより、ポーグラムがブロックされないか、少なくとも次の場合はブロックされませんでした。私はテストしました)。

短所:

  • 追加の通信オーバーヘッドがあり、プールプロセスの生成には少し時間がかかります。sleep(1)この例では、他のプロセスはまだ初期化されていないため、すべての作業が最初のプロセスによって実行される可能性があります。
  • グローバルが必要です(または少なくともそれを回避する方法がわかりません)

例:

import multiprocessing
from time import sleep

def init(queue):
    global idx
    idx = queue.get()

def f(x):
    global idx
    process = multiprocessing.current_process()
    sleep(1)
    return (idx, process.pid, x * x)

ids = [0, 1, 2, 3]
manager = multiprocessing.Manager()
idQueue = manager.Queue()

for i in ids:
    idQueue.put(i)

p = multiprocessing.Pool(8, init, (idQueue,))
print(p.map(f, range(8)))

出力:

[(0, 8289, 0), (1, 8290, 1), (2, 8294, 4), (3, 8291, 9), (0, 8289, 16), (1, 8290, 25), (2, 8294, 36), (3, 8291, 49)]

プールには8つのプロセスが含まれ、1つのidxは1つのプロセスでのみ使用されますが、pidは4つしかないことに注意してください。

于 2017-03-15T18:30:39.020 に答える
1

私はこれをスレッド化で行い、最終的にはキューを使用してジョブ管理を処理しました。これがベースラインです。私の完全なバージョンにはたくさんありますtry-catches(特にワーカーでは、q.task_done()失敗した場合でも呼び出されるようにします)。

from threading import Thread
from queue import Queue
import time
import random


def run(idx, *args):
    time.sleep(random.random() * 1)
    print idx, ':', args


def run_jobs(jobs, workers=1):
    q = Queue()
    def worker(idx):
        while True:
            args = q.get()
            run(idx, *args)
            q.task_done()

    for job in jobs:
        q.put(job)

    for i in range(0, workers):
        t = Thread(target=worker, args=[i])
        t.daemon = True
        t.start()

    q.join()


if __name__ == "__main__":
    run_jobs([('job', i) for i in range(0,10)], workers=5)

マルチプロセッシングを使用する必要はありませんでしたが(私のワーカーは外部プロセスを呼び出すためだけのものです)、これは拡張できます。マルチプロセッシング用のAPIはそれを少し変えます、これがあなたがどのように適応することができるかです:

from multiprocessing import Process, Queue
from Queue import Empty
import time
import random

def run(idx, *args):
    time.sleep(random.random() * i)
    print idx, ':', args


def run_jobs(jobs, workers=1):
    q = Queue()
    def worker(idx):
        try:
            while True:
                args = q.get(timeout=1)
                run(idx, *args)
        except Empty:
            return

    for job in jobs:
        q.put(job)

    processes = []
    for i in range(0, workers):
        p = Process(target=worker, args=[i])
        p.daemon = True
        p.start()
        processes.append(p)

    for p in processes: 
        p.join()


if __name__ == "__main__":
    run_jobs([('job', i) for i in range(0,10)], workers=5)

どちらのバージョンも次のように出力されます。

0 : ('job', 0)
1 : ('job', 2)
1 : ('job', 6)
3 : ('job', 3)
0 : ('job', 5)
1 : ('job', 7)
2 : ('job', 1)
4 : ('job', 4)
3 : ('job', 8)
0 : ('job', 9)
于 2017-06-05T15:10:25.587 に答える
0

それがどのように機能するかはわかりませんPoolが、印刷するとProcessいくつかのユニークな出力が得られます。

x = Process(target=time.sleep, args=[20])
x.start()
print(x)  # <Process name='Process-5' pid=97121 parent=95732 started>
于 2020-10-05T14:10:08.547 に答える
0

を使用して関数ハンドルを取得getattrし、ラッパーを使用して、マップされているメソッドに渡したい引数をいくつでもパックおよびアンパックすることで、クラスメソッドにマップすることができました。私の場合、プールが起動されていたのと同じクラスからメソッドを渡していましたが、オブジェクトを渡して別のクラスにマップすることもできます。

これはコードです:

import multiprocessing
from multiprocessing import Pool


def warp(args):
    func = args[0]
    frame = args[1]
    left_over = args[2:]
    func(frame, *left_over)


class MyClass:

    def __init__(self):
        self.my_flag = 5

    def exec_method(self, method, int_list, *args):
        obj = getattr(self, method.__name__)

        packed = list()
        for i in int_list:
            pack = list()
            pack.append(obj)
            pack.append(i)
            for arg in args:
                pack.append(arg)
            packed.append(pack)

        print("Start")
        pool = Pool(processes=multiprocessing.cpu_count())
        pool.map(warp, packed)
        print("End")

    def method1(self, my_str):
        print(self.my_flag, my_str)

    def method2(self, i, print_str, bool_flat):
        print(multiprocessing.current_process(), self.my_flag, i, print_str, str(bool_flat))


cls: MyClass = MyClass()
cls.my_flag = 58
cls.exec_method(cls.method2, [1, 5, 10, 20, 30], "this is a string", True)

これは出力です:

Start
<ForkProcess(ForkPoolWorker-1, started daemon)> 58 1 this is a string True
<ForkProcess(ForkPoolWorker-2, started daemon)> 58 5 this is a string True
<ForkProcess(ForkPoolWorker-4, started daemon)> 58 20 this is a string True
<ForkProcess(ForkPoolWorker-5, started daemon)> 58 30 this is a string True
<ForkProcess(ForkPoolWorker-3, started daemon)> 58 10 this is a string True
End
于 2020-11-25T08:08:25.803 に答える