4

初めて正常に動作する大量のデータを並行して処理しています。しかし、プログラムを関数でラップし、異なるパラメーターで複数回呼び出すと (たとえば、特定の年のみを処理する必要があります)、メモリが最初に 2 倍になり、次に 3 倍になり、PC のメモリがなくなるまで続きます。

何が起こっているのかわかりませんが、私がやっていることの次の最小限の例を実行すると、マルチプロセッシングロガーの奇妙な出力が得られます。基本的に、calc() 関数を n 回呼び出すと、ロガーは各出力を n 回表示します。

import multiprocessing
import time
import logging

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print '%s: Exiting' % proc_name
                self.task_queue.task_done()
                break
            print '%s: %s' % (proc_name, next_task)
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return


class Task(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b
    def __call__(self):
        time.sleep(0.1) # pretend to take some time to do the work
        return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
    def __str__(self):
        return '%s * %s' % (self.a, self.b)


def calc():

    multiprocessing.log_to_stderr(logging.DEBUG)
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Start consumers
    num_consumers = 1
    print 'Creating %d consumers' % num_consumers
    consumers = [ Consumer(tasks, results)
                  for i in xrange(num_consumers) ]
    for w in consumers:
        w.start()

    # Enqueue jobs
    num_jobs = 3
    for i in xrange(num_jobs):
        tasks.put(Task(i, i))

    # Add a poison pill for each consumer
    for i in xrange(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()

    # Start printing results
    while num_jobs:
        result = results.get()
        print 'Result:', result
        num_jobs -= 1


if __name__ == '__main__':
    calc()
    print '--------------------------------------------'
    print 'RUNNING SECOND TIME ALL CALLS ARE DUPLICATED'
    print '--------------------------------------------'
    calc()

ロガー出力は次のとおりです。

[DEBUG/MainProcess] created semlock with handle 140730532954112
[DEBUG/MainProcess] created semlock with handle 140730532921344
[DEBUG/MainProcess] created semlock with handle 140730532888576
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/MainProcess] created semlock with handle 140730532855808
[DEBUG/MainProcess] created semlock with handle 140730532823040
[DEBUG/MainProcess] created semlock with handle 140730532790272
[DEBUG/MainProcess] created semlock with handle 140730532757504
[DEBUG/MainProcess] created semlock with handle 140730532724736
[DEBUG/MainProcess] created semlock with handle 140730494124032
[DEBUG/MainProcess] created semlock with handle 140730494091264
[DEBUG/MainProcess] created semlock with handle 140730494058496
[DEBUG/MainProcess] Queue._after_fork()
Creating 1 consumers
[DEBUG/MainProcess] Queue._start_thread()
[DEBUG/MainProcess] doing self._thread.start()
[DEBUG/Consumer-1] Queue._after_fork()
[DEBUG/Consumer-1] Queue._after_fork()
[INFO/Consumer-1] child process calling self.run()
[DEBUG/MainProcess] starting thread to feed data to pipe
[DEBUG/MainProcess] ... done self._thread.start()
Consumer-1: 0 * 0
[DEBUG/Consumer-1] Queue._start_thread()
[DEBUG/Consumer-1] doing self._thread.start()
[DEBUG/Consumer-1] starting thread to feed data to pipe
[DEBUG/Consumer-1] ... done self._thread.start()
Consumer-1: 1 * 1
Consumer-1: 2 * 2
Consumer-1: Exiting
[INFO/Consumer-1] process shutting down
[DEBUG/Consumer-1] running all "atexit" finalizers with priority >= 0
Result: 0 * 0 = 0
Result: 1 * 1 = 1
Result: 2 * 2 = 4
--------------------------------------------
RUNNING SECOND TIME ALL CALLS ARE DUPLICATED
--------------------------------------------
[DEBUG/Consumer-1] telling queue thread to quit
[DEBUG/Consumer-1] running the remaining "atexit" finalizers
[DEBUG/Consumer-1] joining queue thread
[DEBUG/Consumer-1] feeder thread got sentinel -- exiting
[DEBUG/MainProcess] created semlock with handle 140730485637120
[DEBUG/MainProcess] created semlock with handle 140730485637120
[DEBUG/MainProcess] created semlock with handle 140730485604352
[DEBUG/MainProcess] created semlock with handle 140730485604352
[DEBUG/Consumer-1] ... queue thread joined
[DEBUG/MainProcess] created semlock with handle 140730485571584
[DEBUG/MainProcess] created semlock with handle 140730485571584
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/MainProcess] Queue._after_fork()
[INFO/Consumer-1] process exiting with exitcode 0
[DEBUG/MainProcess] created semlock with handle 140730485538816
[DEBUG/MainProcess] created semlock with handle 140730485538816
[DEBUG/MainProcess] created semlock with handle 140730485506048
[DEBUG/MainProcess] created semlock with handle 140730485506048
[DEBUG/MainProcess] created semlock with handle 140730485473280
[DEBUG/MainProcess] created semlock with handle 140730485473280
[DEBUG/MainProcess] created semlock with handle 140730485440512
[DEBUG/MainProcess] created semlock with handle 140730485440512
[DEBUG/MainProcess] created semlock with handle 140730485407744
[DEBUG/MainProcess] created semlock with handle 140730485407744
[DEBUG/MainProcess] created semlock with handle 140730485374976
[DEBUG/MainProcess] created semlock with handle 140730485374976
[DEBUG/MainProcess] created semlock with handle 140730485342208
[DEBUG/MainProcess] created semlock with handle 140730485342208
[DEBUG/MainProcess] created semlock with handle 140730485309440
[DEBUG/MainProcess] created semlock with handle 140730485309440
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/MainProcess] Queue._after_fork()
Creating 1 consumers
[DEBUG/MainProcess] telling queue thread to quit
[DEBUG/MainProcess] telling queue thread to quit
[DEBUG/MainProcess] Queue._start_thread()
[DEBUG/Consumer-2] Queue._after_fork()
[DEBUG/MainProcess] Queue._start_thread()
[DEBUG/Consumer-2] Queue._after_fork()
[DEBUG/Consumer-2] Queue._after_fork()
[DEBUG/Consumer-2] Queue._after_fork()
[DEBUG/MainProcess] doing self._thread.start()
[INFO/Consumer-2] child process calling self.run()
[DEBUG/MainProcess] doing self._thread.start()
[INFO/Consumer-2] child process calling self.run()
[DEBUG/MainProcess] starting thread to feed data to pipe
[DEBUG/MainProcess] starting thread to feed data to pipe
[DEBUG/MainProcess] ... done self._thread.start()
[DEBUG/MainProcess] ... done self._thread.start()
Consumer-2: 0 * 0
[DEBUG/MainProcess] feeder thread got sentinel -- exiting
[DEBUG/MainProcess] feeder thread got sentinel -- exiting
[DEBUG/MainProcess] joining queue thread
[DEBUG/MainProcess] joining queue thread
[DEBUG/MainProcess] ... queue thread already dead
[DEBUG/MainProcess] ... queue thread already dead
[DEBUG/Consumer-2] Queue._start_thread()
[DEBUG/Consumer-2] Queue._start_thread()
[DEBUG/Consumer-2] doing self._thread.start()
[DEBUG/Consumer-2] doing self._thread.start()
[DEBUG/Consumer-2] starting thread to feed data to pipe
[DEBUG/Consumer-2] starting thread to feed data to pipe
[DEBUG/Consumer-2] ... done self._thread.start()
[DEBUG/Consumer-2] ... done self._thread.start()
Consumer-2: 1 * 1
Consumer-2: 2 * 2
Consumer-2: Exiting
Result: 0 * 0 = 0
Result: 1 * 1 = 1
Result: 2 * 2 = 4
[INFO/Consumer-2] process shutting down
[INFO/Consumer-2] process shutting down
[DEBUG/Consumer-2] running all "atexit" finalizers with priority >= 0
[DEBUG/Consumer-2] running all "atexit" finalizers with priority >= 0
[DEBUG/Consumer-2] telling queue thread to quit
[DEBUG/Consumer-2] telling queue thread to quit
[INFO/MainProcess] process shutting down
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] telling queue thread to quit
[DEBUG/MainProcess] telling queue thread to quit
[INFO/MainProcess] calling join() for process Consumer-2
[DEBUG/Consumer-2] running the remaining "atexit" finalizers
[INFO/MainProcess] calling join() for process Consumer-2
[DEBUG/MainProcess] feeder thread got sentinel -- exiting
[DEBUG/MainProcess] feeder thread got sentinel -- exiting
[DEBUG/Consumer-2] running the remaining "atexit" finalizers
[DEBUG/Consumer-2] feeder thread got sentinel -- exiting
[DEBUG/Consumer-2] feeder thread got sentinel -- exiting
[DEBUG/Consumer-2] joining queue thread
[DEBUG/Consumer-2] joining queue thread
[DEBUG/Consumer-2] ... queue thread joined
[DEBUG/Consumer-2] ... queue thread joined
[INFO/Consumer-2] process exiting with exitcode 0
[INFO/Consumer-2] process exiting with exitcode 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers
[DEBUG/MainProcess] running the remaining "atexit" finalizers
[DEBUG/MainProcess] joining queue thread
[DEBUG/MainProcess] joining queue thread
[DEBUG/MainProcess] ... queue thread joined
[DEBUG/MainProcess] ... queue thread joined

どうにかしてマルチプロセッシング環境を初期化する必要がありますか、それともメイン プロセスのループでこれを行うことはできないのでしょうか? Ubuntu 12.04 と Python 2.7.5 を使用しています

4

1 に答える 1

0

この行をcalc()の最後に追加してみてください:

for w in consumers:
    w.join()

結合可能なキューでjoin()を呼び出すと、キューのすべてが消費されるまでブロックされますが、サブプロセスがガベージ コレクションされたことは保証されません。サブプロセスに結合されていないため、メモリに残っているオブジェクトがあると思われます。

于 2014-07-15T21:11:20.060 に答える