5

3つのプロセスを起動していますが、プロセス(i)に対応するインデックスで、文字列を共有配列に配置する必要があります。

以下のコードを見てください。生成される出力は次のとおりです。

['test 0', None, None]
['test 1', 'test 1', None]
['test 2', 'test 2', 'test 2']

'test 0'がtest 1、およびtest 1によって上書きされるのはなぜtest 2ですか?

私が欲しいのは(順序は重要ではありません):

['test 0', None, None]
['test 0', 'test 1', None]
['test 0', 'test 1', 'test 2']

コード :

#!/usr/bin/env python

import multiprocessing
from multiprocessing import Value, Lock, Process, Array
import ctypes
from ctypes import c_int, c_char_p

class Consumer(multiprocessing.Process):
    def __init__(self, task_queue, result_queue, arr, lock):
            multiprocessing.Process.__init__(self)
            self.task_queue = task_queue
            self.result_queue = result_queue
            self.arr = arr
            self.lock = lock

    def run(self):
            proc_name = self.name
            while True:
                next_task = self.task_queue.get()
                if next_task is None:
                    self.task_queue.task_done()
                    break            
                answer = next_task(arr=self.arr, lock=self.lock)
                self.task_queue.task_done()
                self.result_queue.put(answer)
            return

class Task(object):
    def __init__(self, i):
        self.i = i

    def __call__(self, arr=None, lock=None):
        with lock:
            arr[self.i] = "test %d" % self.i
            print arr[:]

    def __str__(self):
        return 'ARC'

    def run(self):
        print 'IN'

if __name__ == '__main__':
   tasks = multiprocessing.JoinableQueue()
   results = multiprocessing.Queue()

   arr = Array(ctypes.c_char_p, 3)

   lock = multiprocessing.Lock()

   num_consumers = multiprocessing.cpu_count() * 2
   consumers = [Consumer(tasks, results, arr, lock) for i in xrange(num_consumers)]

   for w in consumers:
      w.start()

   for i in xrange(3):
      tasks.put(Task(i))

   for i in xrange(num_consumers):
      tasks.put(None)

Python 2.7.3(Ubuntu)を実行しています

4

1 に答える 1

5

この問題はこれに似ているようです。そこで、JFセバスティアンは、への割り当てが、割り当てを行うサブプロセスにとってのみ意味のあるメモリアドレスをarr[i]指していると推測しました。arr[i]他のサブプロセスは、そのアドレスを調べるときにガベージを取得します。

この問題を回避するには、少なくとも2つの方法があります。multiprocessing.manager1つは、リストを使用することです。

import multiprocessing as mp

class Consumer(mp.Process):
    def __init__(self, task_queue, result_queue, lock, lst):
            mp.Process.__init__(self)
            self.task_queue = task_queue
            self.result_queue = result_queue
            self.lock = lock
            self.lst = lst

    def run(self):
            proc_name = self.name
            while True:
                next_task = self.task_queue.get()
                if next_task is None:
                    self.task_queue.task_done()
                    break            
                answer = next_task(lock = self.lock, lst = self.lst)
                self.task_queue.task_done()
                self.result_queue.put(answer)
            return

class Task(object):
    def __init__(self, i):
        self.i = i

    def __call__(self, lock, lst):
        with lock:
            lst[self.i] = "test {}".format(self.i)
            print([lst[i] for i in range(3)])

if __name__ == '__main__':
   tasks = mp.JoinableQueue()
   results = mp.Queue()
   manager = mp.Manager()
   lst = manager.list(['']*3)

   lock = mp.Lock()
   num_consumers = mp.cpu_count() * 2
   consumers = [Consumer(tasks, results, lock, lst) for i in xrange(num_consumers)]

   for w in consumers:
      w.start()

   for i in xrange(3):
      tasks.put(Task(i))

   for i in xrange(num_consumers):
      tasks.put(None)

   tasks.join()

もう1つの方法は、などの固定サイズの共有配列を使用することmp.Array('c', 10)です。

import multiprocessing as mp

class Consumer(mp.Process):
    def __init__(self, task_queue, result_queue, arr, lock):
            mp.Process.__init__(self)
            self.task_queue = task_queue
            self.result_queue = result_queue
            self.arr = arr
            self.lock = lock

    def run(self):
            proc_name = self.name
            while True:
                next_task = self.task_queue.get()
                if next_task is None:
                    self.task_queue.task_done()
                    break            
                answer = next_task(arr = self.arr, lock = self.lock)
                self.task_queue.task_done()
                self.result_queue.put(answer)
            return

class Task(object):
    def __init__(self, i):
        self.i = i

    def __call__(self, arr, lock):
        with lock:
            arr[self.i].value = "test {}".format(self.i)
            print([a.value for a in arr])

if __name__ == '__main__':
   tasks = mp.JoinableQueue()
   results = mp.Queue()
   arr = [mp.Array('c', 10) for i in range(3)]

   lock = mp.Lock()
   num_consumers = mp.cpu_count() * 2
   consumers = [Consumer(tasks, results, arr, lock) for i in xrange(num_consumers)]

   for w in consumers:
      w.start()

   for i in xrange(3):
      tasks.put(Task(i))

   for i in xrange(num_consumers):
      tasks.put(None)

   tasks.join()

これが機能mp.Array(ctypes.c_char_p, 3)しない場合に機能する理由mp.Array('c', 10)は、サイズが固定されているためメモリアドレスが変更されないのに対しmp.Array(ctypes.c_char_p, 3)、サイズが可変であるためarr[i]、より大きな文字列に割り当てられるとメモリアドレスが変更される可能性があるためだと推測します。

おそらくこれは、ドキュメントが述べているときに警告していることです、

ポインタを共有メモリに格納することは可能ですが、これは特定のプロセスのアドレス空間内の場所を参照することに注意してください。ただし、2番目のプロセスのコンテキストではポインターが無効である可能性が非常に高く、2番目のプロセスからポインターを逆参照しようとするとクラッシュが発生する可能性があります。

于 2013-01-08T19:46:14.793 に答える