0

Pythonでは、ここに私のマルチプロセッシング設定があります。Process メソッドをサブクラス化し、ピッキング/データの目的でキューとその他のフィールドを指定しました。

この戦略は約 95% の時間で機能し、残りの 5% は不明な理由でキューがハングするだけで終了しません (4 つのコアのうち 3 つがジョブを終了し、最後のコアが永遠にかかるのはよくあることなので、強制終了する必要があります)。仕事)。

Python ではキューのサイズが固定されているか、ハングすることを認識しています。私のキューには 1 つの文字列しか格納されていません... プロセッサの ID です。

私のコードが停止する正確な行は次のとおりです。

res = self._recv() 

誰にもアイデアはありますか?正式なコードは以下です。ありがとうございました。

from multiprocessing import Process, Queue
from multiprocessing import cpu_count as num_cores
import codecs, cPickle

class Processor(Process):

    def __init__(self, queue, elements, process_num):
        super(Processor, self).__init__()
        self.queue = queue
        self.elements = elements
        self.id = process_num

    def job(self):

        ddd = []

        for l in self.elements:

            obj = ... heavy computation ...

            dd = {}
            dd['data'] = obj.data
            dd['meta'] = obj.meta
        ddd.append(dd)

        cPickle.dump(ddd, codecs.open(
            urljoin(TOPDIR, self.id+'.txt'), 'w'))

        return self.id

    def run(self):
        self.queue.put(self.job())



 if __name__=='__main__':

        processes = []

        for i in range(0, num_cores()):

            q = Queue()
            p = Processor(q, divided_work(), process_num=str(i))
            processes.append((p, q))
            p.start()

        for val in processes:

            val[0].join()
            key = val[1].get() 

            storage = urljoin(TOPDIR, key+'.txt')

            ddd = cPickle.load(codecs.open(storage , 'r'))

            .. unpack ddd process data ...
4

2 に答える 2