0

数十万行のファイルがあり、その各行は同じプロセス (共分散の計算) を受ける必要があります。かなり時間がかかるので、マルチスレッドにするつもりでした。ただし、私が見たすべての例/チュートリアルは、私がやりたいことに対してかなり複雑でした。2 つのモジュールを一緒に使用する方法を説明する優れたチュートリアルを誰かが教えてくれれば、それは素晴らしいことです。

4

1 に答える 1

0

何かを並行して処理する必要があるときはいつでも、これに似たものを使用します (既存のスクリプトからこれを切り取っただけです)。

#!/usr/bin/env python2
# This Python file uses the following encoding: utf-8

import os, sys, time
from multiprocessing import Queue, Manager, Process, Value, Event, cpu_count

class ThreadedProcessor(object):
  def __init__(self, parser, input_file, output_file, threads=cpu_count()):
    self.parser = parser

    self.num_processes = threads
    self.input_file = input_file
    self.output_file = output_file

    self.shared_proxy = Manager()

    self.input_queue = Queue()
    self.output_queue = Queue()

    self.input_process = Process(target=self.parse_input)
    self.output_process = Process(target=self.write_output)

    self.processes = [Process(target=self.process_row) for i in range(self.num_processes)]

    self.input_process.start()
    self.output_process.start()

    for process in self.processes:
      process.start()

    self.input_process.join()

    for process in self.processes:
      process.join()

    self.output_process.join()

  def parse_input(self):
    for index, row in enumerate(self.input_file):
      self.input_queue.put([index, row])

    for i in range(self.num_processes):
      self.input_queue.put('STOP')

  def process_row(self):
    for index, row in iter(self.input_queue.get, 'STOP'):
      self.output_queue.put([index, row[0], self.parser.parse(row[1])])

    self.output_queue.put('STOP')

  def write_output(self):
    current = 0
    buffer = {}

    for works in range(self.num_processes):
      for index, id, row in iter(self.output_queue.get, 'STOP'):
        if index != current:
          buffer[index] = [id] + row
        else:
          self.output_file.writerow([id] + row)
          current += 1

          while current in buffer:
            self.output_file.writerow(buffer[current])
            del buffer[current]
            current += 1

基本的に、ファイルの読み取り/書き込みを管理する 2 つのプロセスがあります。1 つは入力を読み取って解析し、もう 1 つは「完了」キューから読み取って出力ファイルに書き込みます。他のプロセスが生成され (この場合、数は CPU が持つプロセッサ コアの総数と同じです)、それらはすべて入力キューからの要素を処理します。

于 2012-05-29T07:16:16.560 に答える