1

1 つの非同期シングル スレッド ツイスト/Python プロセスがあります。ポート X でリッスンする X サーバーがあります。ポート Y でリッスンする別のサーバー Y サーバーもあります。Y サーバーは X サーバーのクライアントでもあります (Y 要求を実行するには、要求を X サーバーに渡す必要があります)。 .

どちらも 1 つのスレッドで非同期に実行されます。次のように動作するはずです。

  • Y サーバーが要求を受信する
  • Y サーバーは X クライアントを作成します。X サーバーへの接続を開き、内部要求を送信します。
  • その間(!!!) X サーバーは内部要求を受け入れます
  • Xサーバーはその仕事を行い、内部応答を返します
  • X クライアント (Y サーバーに属する) は内部応答を受け入れ、それを Y 応答としてさらに送信します。

私はそのようなことを実装しようとしていましたが、おそらく deferred を使用していないために失敗しました。私が遅延を理解している限り、彼らの仕事は、上記のシーケンスを小さなチャンクに分割して、これらの部分を X と Y の両方で同時に実行できるようにすることです。

私が必要としているのは、そのようなコミュニケーションがどのように機能するかのスキームを理解することです。疑似コードで行う必要があります...


以下は、失敗した試みの短いコードです。

プロトコル/ファクトリ クラスで構成されるメイン サービス クラスがあります。

class PyCached(protocol.Protocol):
    def __init__(self, factory, verbose):
        self.factory = factory
        self.verbose = verbose
    def dataReceived(self, data):
        log.msg(data)
        if self.verbose:
            print 'received: %s' % (data,)
        request = json.loads(data)
        if self.verbose:
            print 'request: %s' % (request,)
        command = "handle_%s" % (request.pop('command'),)
        if self.verbose:
            print 'command: %s\n' % (command,)
        result = getattr(self.factory, command)(**request)
        self.transport.write(result + "\n")

class PyCachedFactory(protocol.Factory):
    def __init__(self, verbose=False):
        self.clear()
        self.start_time = datetime.now()
        self.verbose = verbose
        log.msg('starts on %s, verbose=%s' % (self.start_time, self.verbose))

    # many many more commands performed by factory

http アクセス サーバーもあります。

from twisted.web.resource import Resource
from twisted.python import log
from twisted.web.server import Site
from client import PyCachedClient

class PyCachedCommand(Resource):
    isLeaf = True

    def getServiceClient(self):
        client = PyCachedClient()
        client.connect(*self.service_address)
        return client

    def render_GET(self, request):
        '''
        Renders service status as plain text.
        '''
        log.msg('GET')
        request.setHeader('Content-Type', 'text/plain')
        try:
            client = self.getServiceClient()
            status = client.status()
            client.close()
            return "PyCached is up since %0.2f seconds" % (status['uptime'],)
        except:
            return "PyCached is down."

    def render_POST(self, request):
        '''
        Executes pycached request ad returns the response.
        '''
        log.msg('POST %s' % (str(request.args)))
        client = self.getServiceClient()
        kwargs = {k: v[0] for k,v in request.args.iteritems()}
        command_name = kwargs.pop('command')
        command = getattr(client, command_name)
        result = str(command(**kwargs))
        client.close()
        request.setHeader('Content-Type', 'text/plain')
        return result

class PyCachedSite(Site):
    '''
    Performs all operations for PyCached HTTP access.
    '''
    def __init__(self, service_address, **kwargs):
        resource = PyCachedCommand()
        resource.service_address = service_address
        Site.__init__(self, resource, **kwargs)

http は、単純なソケットで実装されているメイン サービス クライアントを使用します。これらのクライアント ソケット呼び出しがブロックされているため、おそらくこれが問題の原因です。

class PyCachedClient(object):
    def __init__(self):
        self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    def connect(self, host, port):
        try:
            self.s.connect((host, port))
        except socket.error:
            raise RuntimeError('Something went wrong with PyCached.')

    def close(self):
        self.s.close()

    def _receive(self):
        received = self.s.recv(1024)
        decoded = json.loads(received.rstrip('\n'))
        return decoded['value'] if decoded.has_key('value') else None

    def _send(self, command, options={}):
        request = {'command': command}
        request.update(options)
        self.s.sendall(json.dumps(request))

    def version(self):
        self._send('version')
        return self._receive()

    # many many more commands similar to version

最後に、すべてが twistd/TAC ファイルによって実行されるため、単一のスレッドに常駐します。

from twisted.application import internet, service
from server.service import PyCachedFactory
from server.http import PyCachedSite

application = service.Application('pycached')
# pycached core service
pycachedService = internet.TCPServer(8001, PyCachedFactory())
pycachedService.setServiceParent(application)
# pycached http access
addr = ('localhost', 8001)
pycachedHttp = internet.TCPServer(8002, PyCachedSite(addr))
pycachedHttp.setServiceParent(application)

たとえば、8001 (メイン サービス) に telnet すると{"command":"version"}、すべて問題ありません。しかし、http に問い合わせると、クライアント ソケットがブロックされており、メイン サービスが応答しないため、すべてがブロックされます。

4

1 に答える 1