2

Yahoo Finance から複数のスレッドを使用して大量のデータをダウンロードしようとしています。私はconcurrent.futures.ThreadPoolExecutor物事をスピードアップするために使用しています。利用可能なすべてのファイル記述子 (デフォルトでは 1024) を消費するまで、すべてがうまくいきます。

urllib.request.urlopen()例外が発生すると、ファイル記述子は開いたままになります (使用するソケットのタイムアウトに関係なく) 。通常、このファイル記述子は、単一の (メイン) スレッドからのみ実行する場合に再利用されるため、この問題は発生しません。しかし、これらの例外的なurlopen()呼び出しがThreadPoolExecutorスレッドから行われると、これらのファイル記述子は開いたままになります。これまでに思いついた唯一の解決策ProcessPoolExecutorは、非常に面倒で非効率的なプロセス ( ) を使用するか、許可されるファイル記述子の数を非常に大きなものに増やすことです (私のライブラリのすべての潜在的なユーザーがとにかくこれを行うわけではありません)。 )。この問題に対処するためのよりスマートな方法が必要です。

また、これはPythonライブラリのバグなのか、それとも何か間違っているのでしょうか...

Debian で Python 3.4.1 を実行しています (テスト、カーネル 3.10-3-amd64)。

これは、この動作を示すサンプル コードです。

import concurrent
import concurrent.futures
import urllib.request
import os
import psutil
from time import sleep


def fetchfun(url):
    urllib.request.urlopen(url)


def main():

    print(os.getpid())
    p = psutil.Process(os.getpid())
    print(p.get_num_fds())


    # this url doesn't exist
    test_url = 'http://ichart.finance.yahoo.com/table.csv?s=YHOOxyz' + \
            '&a=00&b=01&c=1900&d=11&e=31&f=2019&g=d'

    with concurrent.futures.ThreadPoolExecutor(1) as executor:
        futures = []
        for i in range(100):
            futures.append(executor.submit(fetchfun, test_url))
        count = 0
        for future in concurrent.futures.as_completed(futures):
            count += 1
            print("{}: {} (ex: {})".format(count, p.get_num_fds(), future.exception()))

    print(os.getpid())
    sleep(60)


if __name__ == "__main__":
    main()
4

1 に答える 1

3

が呼び出されると、リクエストのオブジェクトへの参照を の属性としてHTTPError保存します。その参照はリストに保存され、プログラムが終了するまで破棄されません。つまり、プログラム全体で生き続けていることへの参照があるということです。その参照が存在する限り、で使用されるソケットは開いたままになります。これを回避する方法の 1 つは、例外を処理するときに明示的に閉じることです。HTTPResponsefpHTTPErrorfuturesHTTPResponseHTTPResponseHTTPResponse

with concurrent.futures.ThreadPoolExecutor(1) as executor:
    futures = []
    for i in range(100):
        futures.append(executor.submit(fetchfun, test_url))
    count = 0
    for future in concurrent.futures.as_completed(futures):
        count += 1
        exc = future.exception()
        print("{}: {} (ex: {})".format(count, p.get_num_fds(), exc))
        exc.fp.close()  # Close the HTTPResponse
于 2014-09-19T14:48:01.630 に答える