1

他のタスクを実行するスレーブプロセスを管理するマスタープロセスを備えたシステムを実装する必要があります。2つの異なるスレーブタイプがあり、各スレーブの6つのインスタンスが必要です。動作するものを作成しましたが、タスクが完了すると、各プロセスが強制終了され、新しいプロセスが開始されます。新しいプロセスの生成にはコストがかかるため、これは望ましくありません。各スレーブをプロセスとして実行し続け、完了したときに通知を受け取り、新しい入力で再度実行したいと思います。

私の現在の疑似っぽいコードは以下の通りです。完璧ではありません。私は実際のコードを持っていないので、それを翼にしています。

# SlaveTypeB is pretty much the same.
class SlaveTypeA(multiprocessing.Process):
    def __init__(self, val):
        self.value = val
        self.result = multiprocessing.Queue(1)
        self.start()
    def run(self):
        # In real life, run does something that takes a few seconds.
        sleep(2)
        # For SlaveTypeB, assume it writes self.val to a file instead of incrementing
        self.result.put(self.val + 1)
    def getResult(self):
        return self.result.get()[0]


if __name__ == "__main__":
    MAX_PROCESSES = 6
    # In real life, the input will grow as the while loop is being processed
    input = [1, 4, 5, 6, 9, 6, 3, 3]
    aProcessed = []
    aSlaves = []
    bSlaves = []

    while len(input) > 0 or len(aProcessed) > 0:
        if len(aSlaves) < MAX_PROCESSES and len(input) > 0:
            aSlaves.append(SlaveTypeA(input.pop(0))
        if len(bSlaves) < MAX_PROCESSES and len(aProcessed) > 0 :
            bSlaves.append(SlaveTypeB(aProcesssed.pop(0))
        for aSlave in aSlaves:
            if not aSlave.isAlive():
                aProcessed = aSlave.getResult()
                aSlaves.remove(aSlave)
        for bSlave in bSlaves:
            if not bSlave.isAlive():
                bSlaves.remove(bSlave)

aSlavesとbSlavesのプロセスが強制終了されたり、再生成されたりしないようにするには、どうすればよいですか。パイプを使用できると思っていますが、プロセスがブロックされたときに、待つ必要がないことをどのように判断できるかわかりません。

編集 私はパイプを使用してこれを書き直しました、そしてそれはプロセスを実行し続けることができないという私の問題を解決しました。それでも、これを行うための最良の方法についての意見を求めています。ワーカータイプが1つしかないため、問題が単純化されるため、slaveBの部分は省略しました。

class Slave(Process)
    def __init__(self, id):
        # Call super init, set id, set idlestate = true, etc
        self.parentCon, self.childCon = Pipe()
        self.start()

    def run(self):
        while True:
            input = self.childCon.recv()
            # Do something here in real life
            sleep(2)
            self.childCon.send(input + 1)

   #def isIdle/setIdle():
       # Getter/setter for idle

   def tryGetResult(self):            
       if self.parentCon.poll():
           return self.parentCon.recv()
       return False

   def process(self, input):
       self.parentConnection.send(input)

if __name__ == '__main__'
    MAX_PROCESSES = 6
    jobs = [1, 4, 5, 6, 9, 6, 3, 3]
    slaves = []
    for int i in range(MAX_PROCESSES):
        slaves.append(Slave(i))
    while len(jobs) > 0:
        for slave in slaves:
            result = slave.tryGetResult()
            if result:
                # Do something with result
                slave.setIdle(True)
            if slave.isIdle():
                slave.process(jobs.pop())
                slave.setIdle(False) 

編集2 了解しました。以下の回答を参照してください。

4

2 に答える 2

0

2つのキューを作成しますか?worktodoAキューに何かが入れられるのを待っている間、あなたの労働者をアイドル状態にworktodoBします。そこに置かれたアイテムがあれば、「やめる」と言うことができますか?

それ以外の場合は、tMCにコメントを与える必要があります

于 2012-09-26T06:40:00.143 に答える
0

SyncManagerを使用することがこの状況に最適な選択だったようです。

class Master(SyncManager):
    pass

input = [1, 4, 5, 6, 9, 6, 6, 3, 3]
def getNextInput():
    # Check that input isn't empty first
    return input.pop()

if __name__ == "__main__":
    MAX_PROCESSES = 6
    Master.register("getNextInput", getNextInput)
    m = Master(('localhost', 5000))
    m.start()
    for i in range(MAX_PROCESSES):
        Slave()
    while True:
        pass

class Slave(Process):
    def __init__(self):
        multiprocessing.Process.__init__(self)
        self.start()
    def run(self):
        Master.register("getNextInput", getNextInput)
        m = Master(('localhost', 5000))
        m.connect()
        while True:
            input = m.getNextInput()
            # Check for None first
            self.process(input)
    def process(self):
        print "Processed " + str(input)
于 2013-10-08T19:57:13.527 に答える