4

私はツイストにかなり慣れていないので、いくつかの URL をフェッチし、結果を URL ごとに個別のファイルに保存する非同期クライアントを作成しようとしています。限られた数のサーバー (たとえば 10 台) でプログラムを実行すると、リアクター ループが正しく終了し、プログラムが終了します。しかし、たとえば Alexa top 2500 でプログラムを実行すると、プログラムは URL のフェッチを開始しますが、終了しません。タイムアウトを設定しましたが、機能しません。エラーまたは成功のコールバックをトリガーしないオープン ソケットが必要であると考えています。私の目的は、プログラムがページをフェッチするか、接続ごとのタイムアウトが期限切れになったら、プログラムを終了してアクティブなすべてのファイル記述子を閉じる必要があることです。

申し訳ありませんが、コードのインデントがコピー アンド ペースト中に保持されません。現在、確認したところ修正されました。コードは例を示すための最低限のものです。私の問題では、クロールするサイトが膨大な数でプログラムを開始すると、リアクターが停止しないことに注意してください。

#!/usr/bin/env python

from pprint import pformat
from twisted.internet import reactor
import twisted.internet.defer
import sys
from twisted.internet.protocol import Protocol
from twisted.web.client import Agent
from twisted.web.http_headers import Headers

class PrinterClient(Protocol):
    def __init__(self, whenFinished, output):
         self.whenFinished = whenFinished
         self.output = output

    def dataReceived(self, bytes):
         #print '##### Received #####\n%s' % (bytes,)
         self.output.write('%s' % (bytes,))

    def connectionLost(self, reason):
        print 'Finished:', reason.getErrorMessage()
        self.output.write('Finished: %s \n'%(reason.getErrorMessage()))
        self.output.write('#########end########%s\n'%(reason.getErrorMessage()))
        self.whenFinished.callback(None)

def handleResponse(r, output, url):
    output.write('############start############\n')
    output.write('%s\n'%(url))
    #print "version=%s\ncode=%s\nphrase='%s'" % (r.version, r.code, r.phrase)
    output.write("version=%s\ncode=%s\nphrase='%s'"\
             %(r.version, r.code, r.phrase))
    for k, v in r.headers.getAllRawHeaders():
        #print "%s: %s" % (k, '\n  '.join(v))
        output.write("%s: %s\n" % (k, '\n  '.join(v)))
    whenFinished = twisted.internet.defer.Deferred()
    r.deliverBody(PrinterClient(whenFinished, output))
    return whenFinished

def handleError(reason):
    print reason
    #reason.printTraceback()
    #reactor.stop()

def getPage(url, output):
    print "Requesting %s" % (url,)
    d = Agent(reactor).request('GET',
                       url,
    Headers({'User-Agent': ['Mozilla/4.0 (Windows XP 5.1) Java/1.6.0_26']}),
                       None)
    d._connectTimeout = 10
    d.addCallback(handleResponse, output, url)
    d.addErrback(handleError)
    return d

if __name__ == '__main__':
    semaphore = twisted.internet.defer.DeferredSemaphore(500)
    dl = list()
    ipset = set()
    queryset =  set(['http://www.google.com','http://www.google1.com','http://www.google2.com', "up to 2500 sites"])
    filemap = {}
    for q in queryset:
        fpos = q.split('http://')[1].split(':')[0]
        dl.append(semaphore.run(getPage, q, filemap[fpos]))
    dl = twisted.internet.defer.DeferredList(dl)
    dl.addCallbacks(lambda x: reactor.stop(), handleError)
    reactor.run()
    for k in filemap:
        filemap[k].close()

ありがとう。ジェッポ

4

1 に答える 1

8

タイムアウト コードには少なくとも 2 つの問題があります。

まず、設定する唯一のタイムアウトは_connectTimeoutであり、Deferred返された fromに設定しますAgent.request。これは無意味な属性であり、Agent実装や Twisted のどの部分もこれを尊重しません。Agent代わりに、この属性をインスタンスに設定するつもりだったと思います。ただし、これは直接対話するためのものではないプライベート属性です。代わりにconnectTimeout=10Agentイニシャライザに渡す必要があります。

次に、このタイムアウトは TCP 接続セットアップのタイムアウトにのみ影響します。これを に設定すると10、特定の URL の HTTP サーバーへの TCP 接続が 10 秒以内に確立できない場合、要求の試行はタイムアウト エラーで失敗します。ただし、接続が 10 秒未満で正常に確立された場合、タイムアウトはそれ以上の意味を持ちません。サーバーが応答を送信するのに 10 時間かかる場合、 はAgentそこに座って 10 時間待機します。追加のタイムアウト、つまりリクエスト全体のタイムアウトが必要です。

reactor.callLaterこれは、おそらく とを使用して個別に実装するものDeferred.cancelです。例えば、

...
d = agent.request(...)
timeoutCall = reactor.callLater(60, d.cancel)
def completed(passthrough):
    if timeoutCall.active():
        timeoutCall.cancel()
    return passthrough
d.addBoth(completed)
...
于 2013-02-28T18:08:53.920 に答える