ウェブサイトから大量のファイルをダウンロードして処理したい。サイトの利用規約により、1 秒あたりにダウンロードできるファイル数が制限されています。
ファイルの処理にかかる時間が実はボトルネックなので、複数のファイルを並行して処理できるようにしたい。しかし、さまざまなプロセスが組み合わさってダウンロード制限に違反することは望ましくありません。したがって、オーバーリクエスト率を制限するものが必要です。私は次のようなことを考えていましたが、私はmultiprocessing
モジュールの専門家ではありません。
import multiprocessing
from multiprocessing.managers import BaseManager
import time
class DownloadLimiter(object):
def __init__(self, time):
self.time = time
self.lock = multiprocessing.Lock()
def get(self, url):
self.lock.acquire()
time.sleep(self.time)
self.lock.release()
return url
class DownloadManager(BaseManager):
pass
DownloadManager.register('downloader', DownloadLimiter)
class Worker(multiprocessing.Process):
def __init__(self, downloader, queue, file_name):
super().__init__()
self.downloader = downloader
self.file_name = file_name
self.queue = queue
def run(self):
while not self.queue.empty():
url = self.queue.get()
content = self.downloader.get(url)
with open(self.file_name, "a+") as fh:
fh.write(str(content) + "\n")
次に、別の場所でダウンロードを実行します
manager = DownloadManager()
manager.start()
downloader = manager.downloader(0.5)
queue = multiprocessing.Queue()
urls = range(50)
for url in urls:
queue.put(url)
job1 = Worker(downloader, queue, r"foo.txt")
job2 = Worker(downloader, queue, r"bar.txt")
jobs = [job1, job2]
for job in jobs:
job.start()
for job in jobs:
job.join()
これは小規模ではうまくいっているように見えますが、ロックが本当に正しく行われているのか少し心配です。
また、同じ目標を達成するためのより良いパターンがあれば、ぜひ聞きたいです。