同時に行うには 2 つの方法があります。または、実際には、2-3/4 程度:
- 複数のスレッド
- または複数のプロセス、特に「もの」が多くの CPU パワーを消費する場合
- または、特に何千もの「もの」がある場合は、コルーチンまたはグリーンレット
- または上記のいずれかのプール
- イベントループ (手動でコーディング)
- または、 のようなハイブリッド グリーンレット/イベント ループ システム
gevent
。
1000 個の URL がある場合、おそらく同時に 1000 個のリクエストを実行したくないでしょう。たとえば、Web ブラウザは通常、一度に 8 つのリクエストしか処理しません。プールは、一度に 8 つのことだけを行うのに適した方法なので、そうしましょう。
また、一度に 8 つのことしか実行せず、それらは主に I/O バウンドであるため、スレッドは完璧です。
で実装しますfutures
。(Python 2.x または 3.0-3.1 を使用している場合は、バックポートをインストールする必要がありますfutures
。)
import concurrent.futures
urls = ['http://example.com/foo',
'http://example.com/bar']
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
result = b''.join(executor.map(download, urls))
with open('output_file', 'wb') as f:
f.write(result)
もちろん、download
関数を作成する必要がありますが、これは、一度に 1 つずつ実行する場合に作成する関数とまったく同じです。
たとえば、urlopen
(Python 2.x を使用している場合は、urllib2
の代わりに使用しますurllib.request
):
def download(url):
with urllib.request.urlopen(url) as f:
return f.read()
スレッド プール エグゼキュータを自分で構築する方法を学びたい場合、ソースは実際には非常に単純でありmultiprocessing.pool
、stdlib のもう 1 つの優れた例です。
ただし、どちらにも多くの余分なコード (メモリ使用量を改善するための弱い参照の処理、クリーンなシャットダウン、結果を待機するさまざまな方法の提供、例外の適切な伝播など) があり、邪魔になる可能性があります。
PyPI と ActiveState を見回すと、threadpool
理解しやすいようなシンプルなデザインが見つかります。
しかし、これが最も単純な結合可能なスレッドプールです。
class ThreadPool(object):
def __init__(self, max_workers):
self.queue = queue.Queue()
self.workers = [threading.Thread(target=self._worker) for _ in range(max_workers)]
def start(self):
for worker in self.workers:
worker.start()
def stop(self):
for _ in range(self.workers):
self.queue.put(None)
for worker in self.workers:
worker.join()
def submit(self, job):
self.queue.put(job)
def _worker(self):
while True:
job = self.queue.get()
if job is None:
break
job()
もちろん、非常に単純な実装の欠点は、次のように使いにくいことconcurrent.futures.ThreadPoolExecutor
です。
urls = ['http://example.com/foo',
'http://example.com/bar']
results = [list() for _ in urls]
results_lock = threading.Lock()
def download(url, i):
with urllib.request.urlopen(url) as f:
result = f.read()
with results_lock:
results[i] = url
pool = ThreadPool(max_workers=8)
pool.start()
for i, url in enumerate(urls):
pool.submit(functools.partial(download, url, i))
pool.stop()
result = b''.join(results)
with open('output_file', 'wb') as f:
f.write(result)