5

ウェブサイトから大量のファイルをダウンロードして処理したい。サイトの利用規約により、1 秒あたりにダウンロードできるファイル数が制限されています。

ファイルの処理にかかる時間が実はボトルネックなので、複数のファイルを並行して処理できるようにしたい。しかし、さまざまなプロセスが組み合わさってダウンロード制限に違反することは望ましくありません。したがって、オーバーリクエスト率を制限するものが必要です。私は次のようなことを考えていましたが、私はmultiprocessingモジュールの専門家ではありません。

import multiprocessing
from multiprocessing.managers import BaseManager
import time

class DownloadLimiter(object):

    def __init__(self, time):
        self.time = time
        self.lock = multiprocessing.Lock()

    def get(self, url):
        self.lock.acquire()
        time.sleep(self.time)
        self.lock.release()
        return url


class DownloadManager(BaseManager):
    pass

DownloadManager.register('downloader', DownloadLimiter)


class Worker(multiprocessing.Process):

    def __init__(self, downloader, queue, file_name):
        super().__init__()
        self.downloader = downloader
        self.file_name = file_name
        self.queue = queue

    def run(self):
        while not self.queue.empty():
            url = self.queue.get()
            content = self.downloader.get(url)
            with open(self.file_name, "a+") as fh:
                fh.write(str(content) + "\n")

次に、別の場所でダウンロードを実行します

manager = DownloadManager()
manager.start()
downloader = manager.downloader(0.5)
queue = multiprocessing.Queue()

urls = range(50)
for url in urls:
    queue.put(url)

job1 = Worker(downloader, queue, r"foo.txt")
job2 = Worker(downloader, queue, r"bar.txt")
jobs = [job1, job2]

for job in jobs:
    job.start()

for job in jobs:
    job.join()

これは小規模ではうまくいっているように見えますが、ロックが本当に正しく行われているのか少し心配です。

また、同じ目標を達成するためのより良いパターンがあれば、ぜひ聞きたいです。

4

5 に答える 5