6

Python マルチプロセッシングについて質問があります。データセットを取得し、チャンクに分割し、それらのチャンクを同時に実行中のプロセスに渡そうとしています。簡単な計算を使用して大規模なデータ テーブルを変換する必要があります (例: 電気抵抗 -> サーミスタの温度)。

以下にリストされているコードは、ほぼ希望どおりに機能しますが、新しいプロセスを生成していないようです (または、一度に 1 つだけ)。

from multiprocessing import Process

class Worker(Process):
    # example data transform
    def process(self, x): return (x * 2) / 3
    
    def __init__(self, list):
        self.data = list
        self.result = map(self.process, self.data)
        super(Worker, self).__init__()
        
if __name__ == '__main__':
    start = datetime.datetime.now()
    dataset = range(10000) # null dataset
    processes = 3
    
    for i in range(processes):
        chunk = int(math.floor(len(dataset) / float(processes)))

        if i + 1 == processes:
            remainder = len(dataset) % processes
        else: remainder = 0

        tmp = dataset[i * chunk : (i + 1) * chunk + remainder]
        exec('worker'+str(i)+' = Worker(tmp)')
        exec('worker'+str(i)+'.start()')
        
    for i in range(processes):
        exec('worker'+str(i)+'.join()')
        # just a placeholder to make sure the initial values of the set are as expected
        exec('print worker'+str(i)+'.result[0]')
4

3 に答える 3

1

各プロセスにチャンクの数を送信する必要はありません。get_nowait()を使用して、最終的なQueue.Empty例外を処理するだけです。すべてのプロセスは異なる量のCPU時間を取得し、これによりすべてのプロセスがビジー状態に保たれるはずです。

import multiprocessing, Queue

class Worker(multiprocessing.Process):
    def process(self, x): 
        for i in range(15):
            x += (float(i) / 2.6)
        return x

    def __init__(self, input, output):
        self.input = input
        self.output = output
        super(Worker, self).__init__()

    def run(self):
        try:
            while True:
                self.output.put(self.process(self.input.get_nowait()))
        except Queue.Empty:
            pass


if name == 'main':
    dataset = range(10)
    processes = multiprocessing.cpu_count()
    input = multiprocessing.Queue()
    output = multiprocessing.Queue()

    for obj in dataset:
        input.put(obj)
    for i in range(processes):
        Worker(input, output).start()

    for i in range(len(dataset)):
        print output.get()
于 2009-04-13T05:20:54.220 に答える
1

runメソッドをオーバーライドしていません。プロセス(またはスレッド)でコードを実行する方法は2つあります。

  1. ターゲットを指定するプロセスを作成します
  2. プロセスをサブクラス化し、メソッドをオーバーライドしますrun

オーバーライド__init__するということは、プロセスがすべてドレスアップされ、行き場がないことを意味します。実行する必要があることを実行するために必要な属性を与えるために使用する必要がありますが、実行するタスクを指定するべきではありません。

あなたのコードでは、すべての重労働はこの行で行われます:

exec('worker'+str(i)+' = Worker(tmp)')

ここでは何も行われません。

exec('worker'+str(i)+'.start()')

したがって、で結果を確認すると、exec('print worker'+str(i)+'.result[0]')何か意味のあるものが得られるはずですが、実行したいコードが実行さたからですが、プロセスの開始時ではなく、プロセスの構築時にです。

これを試して:

class Worker(Process):
    # example data transform
    def process(self, x): return (x * 2) / 3

    def __init__(self, list):
        self.data = list
        self.result = []
        super(Worker, self).__init__()

    def run(self):
        self.result = map(self.process, self.data)

編集:

さて...だから私はここで私の糸脱毛の本能に基づいて飛んでいました、そしてそれらはすべて間違っていました。プロセスについて私たち二人が理解していなかったことは、変数を直接共有できないということです。開始するために新しいプロセスに渡すものはすべて、読み取られ、コピーされ、永久に失われます。データを共有するための2つの標準的な方法のいずれかを使用しない限り:キューとパイプ。私はあなたのコードを機能させるために少し遊んだことがありますが、今のところ運がありません。私はそれがあなたを正しい軌道に乗せると思います。

于 2009-04-11T20:53:34.027 に答える
0

さて、リストはスレッドセーフではなかったようで、Queue を使用するようになりました (ただし、はるかに遅いように見えます)。このコードは、本質的に私がやろうとしていたことを達成します:

import math, multiprocessing

class Worker(multiprocessing.Process):
    def process(self, x): 
        for i in range(15):
            x += (float(i) / 2.6)
        return x

    def __init__(self, input, output, chunksize):
        self.input = input
        self.output = output
        self.chunksize = chunksize
        super(Worker, self).__init__()

    def run(self):
        for x in range(self.chunksize):
            self.output.put(self.process(self.input.get()))


if __name__ == '__main__':
    dataset = range(10)
    processes = multiprocessing.cpu_count()
    input = multiprocessing.Queue()
    output = multiprocessing.Queue()

    for obj in dataset:
        input.put(obj)

    for i in range(processes):
        chunk = int(math.floor(len(dataset) / float(processes)))
        if i + 1 == processes:
            remainder = len(dataset) % processes
        else: remainder = 0

        Worker(input, output, chunk + remainder).start()

    for i in range(len(dataset)):
        print output.get()
于 2009-04-13T04:25:52.777 に答える