他のタスクを実行するスレーブプロセスを管理するマスタープロセスを備えたシステムを実装する必要があります。2つの異なるスレーブタイプがあり、各スレーブの6つのインスタンスが必要です。動作するものを作成しましたが、タスクが完了すると、各プロセスが強制終了され、新しいプロセスが開始されます。新しいプロセスの生成にはコストがかかるため、これは望ましくありません。各スレーブをプロセスとして実行し続け、完了したときに通知を受け取り、新しい入力で再度実行したいと思います。
私の現在の疑似っぽいコードは以下の通りです。完璧ではありません。私は実際のコードを持っていないので、それを翼にしています。
# SlaveTypeB is pretty much the same.
class SlaveTypeA(multiprocessing.Process):
def __init__(self, val):
self.value = val
self.result = multiprocessing.Queue(1)
self.start()
def run(self):
# In real life, run does something that takes a few seconds.
sleep(2)
# For SlaveTypeB, assume it writes self.val to a file instead of incrementing
self.result.put(self.val + 1)
def getResult(self):
return self.result.get()[0]
if __name__ == "__main__":
MAX_PROCESSES = 6
# In real life, the input will grow as the while loop is being processed
input = [1, 4, 5, 6, 9, 6, 3, 3]
aProcessed = []
aSlaves = []
bSlaves = []
while len(input) > 0 or len(aProcessed) > 0:
if len(aSlaves) < MAX_PROCESSES and len(input) > 0:
aSlaves.append(SlaveTypeA(input.pop(0))
if len(bSlaves) < MAX_PROCESSES and len(aProcessed) > 0 :
bSlaves.append(SlaveTypeB(aProcesssed.pop(0))
for aSlave in aSlaves:
if not aSlave.isAlive():
aProcessed = aSlave.getResult()
aSlaves.remove(aSlave)
for bSlave in bSlaves:
if not bSlave.isAlive():
bSlaves.remove(bSlave)
aSlavesとbSlavesのプロセスが強制終了されたり、再生成されたりしないようにするには、どうすればよいですか。パイプを使用できると思っていますが、プロセスがブロックされたときに、待つ必要がないことをどのように判断できるかわかりません。
編集 私はパイプを使用してこれを書き直しました、そしてそれはプロセスを実行し続けることができないという私の問題を解決しました。それでも、これを行うための最良の方法についての意見を求めています。ワーカータイプが1つしかないため、問題が単純化されるため、slaveBの部分は省略しました。
class Slave(Process)
def __init__(self, id):
# Call super init, set id, set idlestate = true, etc
self.parentCon, self.childCon = Pipe()
self.start()
def run(self):
while True:
input = self.childCon.recv()
# Do something here in real life
sleep(2)
self.childCon.send(input + 1)
#def isIdle/setIdle():
# Getter/setter for idle
def tryGetResult(self):
if self.parentCon.poll():
return self.parentCon.recv()
return False
def process(self, input):
self.parentConnection.send(input)
if __name__ == '__main__'
MAX_PROCESSES = 6
jobs = [1, 4, 5, 6, 9, 6, 3, 3]
slaves = []
for int i in range(MAX_PROCESSES):
slaves.append(Slave(i))
while len(jobs) > 0:
for slave in slaves:
result = slave.tryGetResult()
if result:
# Do something with result
slave.setIdle(True)
if slave.isIdle():
slave.process(jobs.pop())
slave.setIdle(False)
編集2 了解しました。以下の回答を参照してください。