4

100k ヘッドのリクエストを行う必要があり、リクエストに加えて gevent を使用しています。私のコードはしばらく実行されますが、最終的にハングします。なぜハングしているのか、リクエスト内でハングしているのかgevent内でハングしているのかはわかりません。リクエストとgeventの両方でタイムアウト引数を使用しています。

以下のコード スニペットを見て、何を変更すればよいか教えてください。

import gevent
from gevent import monkey, pool
monkey.patch_all()
import requests

def get_head(url, timeout=3):
    try:
        return requests.head(url, allow_redirects=True, timeout=timeout)
    except:
        return None

def expand_short_urls(short_urls, chunk_size=100, timeout=60*5):
    chunk_list = lambda l, n: ( l[i:i+n] for i in range(0, len(l), n) )
    p = pool.Pool(chunk_size)
    print 'Expanding %d short_urls' % len(short_urls)
    results = {}
    for i, _short_urls_chunked in enumerate(chunk_list(short_urls, chunk_size)):
        print '\t%d. processing %d urls @ %s' % (i, chunk_size, str(datetime.datetime.now()))
        jobs = [p.spawn(get_head, _short_url) for _short_url in _short_urls_chunked]
        gevent.joinall(jobs, timeout=timeout)
        results.update({_short_url:job.get().url for _short_url, job in zip(_short_urls_chunked, jobs) if job.get() is not None and job.get().status_code==200})
    return results 

grequests を試してみましたが、放棄されました。github のプル リクエストも試しましたが、すべて問題があります。

4

2 に答える 2

9

観察している RAM の使用量は、主に 100.000 の応答オブジェクトを保存している間に蓄積されたすべてのデータと、基礎となるすべてのオーバーヘッドに起因しています。あなたのアプリケーション ケースを再現し、Alexa ランキングの上位 15000 個の URL に対して HEAD リクエストを送信しました。それは本当に問題ではありませんでした

  • gevent プール (つまり、接続ごとに 1 つの greenlet) を使用したか、すべてが複数の URL を要求する固定セットの greenlet を使用したか
  • プールサイズをどのくらいの大きさに設定するか

最終的に、RAM の使用量は時間とともに増加し、かなりの量になりました。requestsただし、 からに変更するurllib2と、RAM の使用量が約 2 分の 1 に減少することに気付きました。つまり交換した

result = requests.head(url)

request = urllib2.Request(url)
request.get_method = lambda : 'HEAD'
result = urllib2.urlopen(request)

その他のアドバイス: 2 つのタイムアウト メカニズムを使用しないでください。Gevent のタイムアウト アプローチは非常に堅実で、次のように簡単に使用できます。

def gethead(url):
    result = None
    try:
        with Timeout(5, False):
            result = requests.head(url)
    except Exception as e:
        result = e
    return result

ややこしく見えるかもしれませんが、None(正確に 5 秒後にタイムアウトを示します)、通信エラーを表す例外オブジェクト、または応答のいずれかを返します。よく働く!

これはおそらく問題の一部ではありませんが、そのような場合、ワーカーを生かしておき、それぞれに複数のアイテムで作業させることをお勧めします! greenlets を生成するオーバーヘッドは、確かに小さいです。それでも、これは長寿命の greenlet のセットを使用した非常に単純なソリューションになります。

def qworker(qin, qout):
    while True:
        try:
            qout.put(gethead(qin.get(block=False)))
        except Empty:
            break

qin = Queue()
qout = Queue()

for url in urls:
    qin.put(url)

workers = [spawn(qworker, qin, qout) for i in xrange(POOLSIZE)]
joinall(workers)
returnvalues = [qout.get() for _ in xrange(len(urls))]

また、これは大規模な問題に取り組んでおり、非標準的な問題を引き起こしていることを認識する必要があります。20 秒のタイムアウトと 100 のワーカーと 15000 の URL を要求するシナリオを再現したところ、多数のソケットを簡単に取得できました。

# netstat -tpn | wc -l
10074

つまり、OS には 10000 を超えるソケットがあり、そのほとんどが TIME_WAIT 状態でした。「開いているファイルが多すぎます」というエラーも確認し、sysctl を介して制限を調整しました。100.000 の URL を要求すると、おそらくそのような制限にも達するため、システムの枯渇を防ぐための対策を講じる必要があります。

また、リクエストの使用方法にも注意してください。HTTP から HTTPS へのリダイレクトを自動的にたどり、証明書を自動的に検証します。これらはすべて、確実に RAM を消費します。

私の測定では、要求された URL の数をプログラムの実行時間で割ったときに、100 応答/秒を渡すことはほとんどありませんでした。これは、世界中の外部サーバーへの接続の待ち時間が長いためです。あなたもそのような制限の影響を受けていると思います。アーキテクチャの残りの部分をこの制限に合わせて調整すると、インターネットからディスク (またはデータベース) へのデータ ストリームを生成する際に、RAM の使用量がそれほど大きくない可能性があります。

具体的には、次の 2 つの主な質問にお答えください。

gevent/使用方法は問題ではないと思います。あなたは自分の仕事の複雑さを過小評価していると思います。それは厄介な問題を伴い、システムを限界まで追い込みます。

  • RAM 使用量の問題:urllib2可能であれば、 を使用して開始します。そして、それでも物があまりにも多く蓄積する場合は、蓄積に対抗する必要があります。安定した状態を作り出すようにしてください。ディスクへのデータの書き込みを開始し、通常はオブジェクトがガベージ コレクトされる状況に向けて作業することをお勧めします。

  • あなたのコードは「最終的にハングアップします」: おそらくこれは RAM の問題です。そうでない場合は、あまり多くの greenlet を生成せず、示されているようにそれらを再利用します。また、並行性をさらに減らし、開いているソケットの数を監視し、必要に応じてシステムの制限を増やし、ソフトウェアがハングする場所を正確に見つけようとします。

于 2015-02-07T02:10:18.247 に答える