3

複数のコアを備えたシステムで実行する数万のシミュレーションがあります。現在、入力パラメーターを知っているシリアルで実行され、結果を dict に保存します。

シリアル版

import time
import random

class MyModel(object):
    input = None
    output = None

    def run(self):
        time.sleep(random.random())  # simulate a complex task
        self.output = self.input * 10


# Run serial tasks and store results for each parameter

parameters = range(10)
results = {}

for p in parameters:
    m = MyModel()
    m.input = p
    m.run()
    results[p] = m.output

print('results: ' + str(results))

所要時間は 10 秒未満で、正しい結果が表示されます。

results: {0: 0, 1: 10, 2: 20, 3: 30, 4: 40, 5: 50, 6: 60, 7: 70, 8: 80, 9: 90}

パラレル版

この手順を並列化する試みは、 「キューを使用してワーカー プロセスのコレクションにタスクをフィードし、結果を収集する方法を示す例」というmultiprocessingテキストの近くのモジュールの例に基づいています (申し訳ありませんが、利用できる URL アンカーはありません)。

以下は、シリアル バージョンの上半分に基づいています。

from multiprocessing import Process, Queue
NUMBER_OF_PROCESSES = 4

def worker(input, output):
    for args in iter(input.get, 'STOP'):
        m = MyModel()
        m.input = args[0]
        m.run()
        output.put(m.output)


# Run parallel tasks and store results for each parameter

parameters = range(10)
results = {}

# Create queues
task_queue = Queue()
done_queue = Queue()

# Submit tasks
tasks = [(t,) for t in parameters]
for task in tasks:
    task_queue.put(task)

# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
    Process(target=worker, args=(task_queue, done_queue)).start()

# Get unordered results
for i in range(len(tasks)):
    results[i] = done_queue.get()

# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
    task_queue.put('STOP')

print('results: ' + str(results))

数秒しかかかりませんが、入力と結果の間のマッピング順序が混同されています。

results: {0: 10, 1: 0, 2: 60, 3: 40, 4: 20, 5: 80, 6: 30, 7: 90, 8: 70, 9: 50}

resultsunordered に基づいて にデータを入力していることに気付きましたdone_queue.get()が、 への正しいマッピングを取得する方法がわかりませんtask_queue。何か案は?これを何とかきれいにする他の方法はありますか?

4

1 に答える 1

1

あはは!ワーカーは、返されたプロセスを識別するために使用できる、出力キューに返すために使用される入力パラメーターなど、ある種の ID を埋め込む必要があります。必要な変更は次のとおりです。

def worker(input, output):
    for args in iter(input.get, 'STOP'):
        m = MyModel()
        m.input = args[0]
        m.run()
        # Return a tuple of an ID (the input parameter), and the model output
        return_obj = (m.input, m.output)
        output.put(return_obj)

# Get unordered results
for i in range(len(tasks)):
    # Unravel output tuple, which has the input parameter 'p' used as an ID
    p, result = done_queue.get()
    results[p] = result
于 2013-07-04T06:32:49.140 に答える