Pythonでは、ここに私のマルチプロセッシング設定があります。Process メソッドをサブクラス化し、ピッキング/データの目的でキューとその他のフィールドを指定しました。
この戦略は約 95% の時間で機能し、残りの 5% は不明な理由でキューがハングするだけで終了しません (4 つのコアのうち 3 つがジョブを終了し、最後のコアが永遠にかかるのはよくあることなので、強制終了する必要があります)。仕事)。
Python ではキューのサイズが固定されているか、ハングすることを認識しています。私のキューには 1 つの文字列しか格納されていません... プロセッサの ID です。
私のコードが停止する正確な行は次のとおりです。
res = self._recv()
誰にもアイデアはありますか?正式なコードは以下です。ありがとうございました。
from multiprocessing import Process, Queue
from multiprocessing import cpu_count as num_cores
import codecs, cPickle
class Processor(Process):
def __init__(self, queue, elements, process_num):
super(Processor, self).__init__()
self.queue = queue
self.elements = elements
self.id = process_num
def job(self):
ddd = []
for l in self.elements:
obj = ... heavy computation ...
dd = {}
dd['data'] = obj.data
dd['meta'] = obj.meta
ddd.append(dd)
cPickle.dump(ddd, codecs.open(
urljoin(TOPDIR, self.id+'.txt'), 'w'))
return self.id
def run(self):
self.queue.put(self.job())
if __name__=='__main__':
processes = []
for i in range(0, num_cores()):
q = Queue()
p = Processor(q, divided_work(), process_num=str(i))
processes.append((p, q))
p.start()
for val in processes:
val[0].join()
key = val[1].get()
storage = urljoin(TOPDIR, key+'.txt')
ddd = cPickle.load(codecs.open(storage , 'r'))
.. unpack ddd process data ...