7

私が取り組んでいるスクレイピープロジェクトで設定しようとしている永続的なメッセージキューに txredis (redis のノンブロッキングツイスト API) を使用しようとして失敗しました。クライアントはブロックしていませんでしたが、リアクター ループ内の 1 つのイベントであったはずのものが数千のステップに分割されたため、本来よりもはるかに遅くなることがわかりました。

その代わりに、redis-py (通常のブロッキング ツイスト API) を利用して、遅延スレッドで呼び出しをラップしてみました。それはうまく機能しますが、さらに高速化するために接続プーリングを設定したいので、redis を呼び出すときに内部遅延を実行したいと考えています。

以下は、私の使用例を説明するために、遅延スレッドのツイスト ドキュメントから取得したサンプル コードの解釈です。

#!/usr/bin/env python
from twisted.internet import reactor,threads
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'


def aBlockingRedisCall():
    print 'doing lookup... this may take a while'
    time.sleep(10)
    return 'results from redis'

def result(res):
    print res

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)
    d = threads.deferToThread(aBlockingRedisCall)
    d.addCallback(result)
    reactor.run()

if __name__=='__main__':
    main()

そして、遅延スレッドのコードをブロックする接続プーリングの私の変更は次のとおりです。

#!/usr/bin/env python
from twisted.internet import reactor,defer
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'

def aBlockingRedisCall(x):
    if x<5: #all connections are busy, try later
        print '%s is less than 5, get a redis client later' % x
        x+=1
        d = defer.Deferred()
        d.addCallback(aBlockingRedisCall)
        reactor.callLater(1.0,d.callback,x)
        return d

    else: 
        print 'got a redis client; doing lookup.. this may take a while'
        time.sleep(10) # this is now blocking.. any ideas?
        d = defer.Deferred()
        d.addCallback(gotFinalResult)
        d.callback(x)
        return d

def gotFinalResult(x):
    return 'final result is %s' % x

def result(res):
    print res

def aBlockingMethod():
    print 'going to sleep...'
    time.sleep(10)
    print 'woke up'

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)


    d = defer.Deferred()
    d.addCallback(aBlockingRedisCall)
    d.addCallback(result)
    reactor.callInThread(d.callback, 1)
    reactor.run()

if __name__=='__main__':
    main()

私の質問は、なぜ私の変更により遅延スレッドがブロックされるのか、またはより良い解決策を誰かが提案できるのかを知っている人はいますか?

4

3 に答える 3

12

さて、ねじれたドキュメントが言うように:

Deferred はコードを魔法のようにブロックしない

などのブロッキング コードを使用している場合は常に、sleepそれを新しいスレッドに延期する必要があります。

#!/usr/bin/env python
from twisted.internet import reactor,defer, threads
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'

def aBlockingRedisCall(x):
    if x<5: #all connections are busy, try later
        print '%s is less than 5, get a redis client later' % x
        x+=1
        d = defer.Deferred()
        d.addCallback(aBlockingRedisCall)
        reactor.callLater(1.0,d.callback,x)
        return d

    else: 
        print 'got a redis client; doing lookup.. this may take a while'
        def getstuff( x ):
            time.sleep(3)
            return "stuff is %s" % x

        # getstuff is blocking, so you need to push it to a new thread
        d = threads.deferToThread(getstuff, x)
        d.addCallback(gotFinalResult)
        return d

def gotFinalResult(x):
    return 'final result is %s' % x

def result(res):
    print res

def aBlockingMethod():
    print 'going to sleep...'
    time.sleep(10)
    print 'woke up'

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)


    d = defer.Deferred()
    d.addCallback(aBlockingRedisCall)
    d.addCallback(result)
    reactor.callInThread(d.callback, 1)
    reactor.run()

if __name__=='__main__':
    main()

redis API がそれほど複雑でない場合は、多くのスレッドでブロッキング API を呼び出すだけでなく、twisted.web を使用して書き直す方が自然かもしれません。

于 2010-03-17T22:06:48.320 に答える
3

また、Redis 2.x の新しいプロトコルと機能を既にサポートしている、twisted 用の最新の Redis クライアントもあります。ぜひ試してみてください。これは txredisapi と呼ばれます。

永続的なメッセージ キューについては、RestMQ をお勧めします。サイクロンと txredisapi の上に構築された redis ベースのメッセージ キュー システム。

http://github.com/gleicon/restmq

乾杯

于 2010-09-10T16:58:19.920 に答える
0

これに関連して、特に Twisted 用に作成された Redis クライアント ( http://github.com/deldotdr/txRedisなど) を使用することで、おそらく多くのことを得ることができます。

于 2010-03-18T18:27:38.960 に答える