3

Python でマルチプロセッシングにこのチュートリアルを実装しようとしていますが、自分のタスクを実行しようとすると、次のエラーが発生します。

Traceback (most recent call last):
>>>   File "C:\Python27\lib\multiprocessing\queues.py", line 262, in _feed
    send(obj)
IOError: [Errno 232] The pipe is being closed

これは、同じエラーメッセージを表示する、私がやろうとしていることの再現可能な例です:

from multiprocessing import Lock, Process, Queue, current_process
import time

class Testclass(object):
    def __init__(self, x):
        self.x = x

def toyfunction(testclass):
    testclass.product = testclass.x * testclass.x
    return testclass


def worker(work_queue, done_queue):
    try:
        for testclass in iter(work_queue.get, 'STOP'):
            print(testclass.counter)
            newtestclass = toyfunction(testclass)
            done_queue.put(newtestclass)

    except:
        print('error')

    return True

def main():

    counter = 1

    database = []
    while counter <= 1000:
        database.append(Testclass(3))
        counter += 1
        print(counter)



    workers = 8
    work_queue = Queue()
    done_queue = Queue()
    processes = []

    start = time.clock()
    counter = 1

    for testclass in database:
        testclass.counter = counter
        work_queue.put(testclass)
        counter += 1
        print(counter)


    print('items loaded')
    for w in range(workers):
        p = Process(target=worker, args=(work_queue, done_queue))
        p.start()
        processes.append(p)
        work_queue.put('STOP')

    for p in processes:
        p.join()

    done_queue.put('STOP')

    print(time.clock()-start)
    print("Done")

if __name__ == '__main__':
    main()    
4

2 に答える 2

3

イベントを使用してプロセスを正常に終了した後、キューを空にすることでこれを回避しました。

self.event.set() #the process has a timer that checks for this to be set, then shuts itself down
while not self._q.empty(): #_q is a multiprocess.Queue object used to communicate inter-process
    try:
        self._q.get(timeout=0.001)
    except:
        pass
self._q.close()
于 2014-08-14T20:42:01.120 に答える
0

完了キューを処理するコードを追加すると、エラーが発生しなくなりました。作業コードは次のとおりです。

from multiprocessing import Lock, Process, Queue, current_process
import time

class Testclass(object):
    def __init__(self, x):
        self.x = x

def toyfunction(testclass):
    testclass.product = testclass.x * testclass.x
    return testclass


def worker(work_queue, done_queue):
    try:
        for testclass in iter(work_queue.get, 'STOP'):
            print(testclass.counter)
            newtestclass = toyfunction(testclass)
            done_queue.put(newtestclass)

    except:
        print('error')

    return True

def main():

    counter = 1

    database = []
    while counter <= 100:
        database.append(Testclass(10))
        counter += 1
        print(counter)



    workers = 8
    work_queue = Queue()
    done_queue = Queue()
    processes = []

    start = time.clock()
    counter = 1

    for testclass in database:
        testclass.counter = counter
        work_queue.put(testclass)
        counter += 1
        print(counter)


    print('items loaded')
    for w in range(workers):
        p = Process(target=worker, args=(work_queue, done_queue))
        p.start()
        processes.append(p)
        work_queue.put('STOP')

    for p in processes:
        p.join()

    done_queue.put('STOP')

    # added: process the done queue
    newdatabase = []
    for testclass in iter(done_queue.get, 'STOP'):
        newdatabase.append(testclass)

    print(time.clock()-start)
    print("Done")
    return(newdatabase)

if __name__ == '__main__':
    database = main()
于 2013-09-28T19:49:58.593 に答える