私はツイストを使用して単純な xmlrpc サーバーをバックエンドとしてセットアップしました。これは永続的であり、必要に応じて PyQt/PySide ベースの Webkit ブラウザーの新しいインスタンスを作成したり、既存のものを再利用したりするタスクがあります。ユーザーとバックエンド アプリケーションの間の「仲介者」である別のフロントエンド webapp は、session_id と共にコマンドをツイスト xmlrpc サーバーに送信します。これにより、ツイスト サーバーは、session_id が既に実行中のブラウザーを持っているかどうかを調べることができます。新しいものをインスタンス化する必要があります。簡単な例を次に示します。
# -*- coding: utf-8 -*-
from pyvirtualdisplay import Display
display = Display(visible=False, size=(1024, 768), color_depth=24)
display.start()
from PySide.QtGui import QApplication
app = QApplication([])
import qt4reactor
qt4reactor.install()
from twisted.web import server
from twisted.web.xmlrpc import XMLRPC
from twisted.internet import defer
from PySide.QtWebKit import QWebView, QWebPage
from PySide.QtNetwork import QNetworkAccessManager, QNetworkRequest
from PySide.QtCore import QUrl, QByteArray
class Browser(object):
def __init__(self):
self.network_manager = QNetworkAccessManager()
self.web_page = QWebPage()
self.web_page.setNetworkAccessManager(self.network_manager)
self.web_view = QWebView()
self.web_view.setPage(self.web_page)
self.web_view.loadFinished.connect(self._load_finished)
def _load_finished(self, ok):
# the page is loaded
frame = self.web_view.page().mainFrame()
self.deferred_request.callback(frame.toHtml())
def _make_request(self, url):
request = QNetworkRequest()
request.setUrl(QUrl(url))
return request
def perform(self, request_data):
# say request_data is a dict having keys: 'url', 'post_data'
self.deferred_request = defer.Deferred()
request = self._make_request(request_data['url'])
self.web_view.load(request)
return self.deferred_request
class TestXMLRPCServer(XMLRPC):
def __init__(self):
XMLRPC.__init__(self, allowNone=True)
self.browser_instances = dict()
def xmlrpc_open(self, request_data, session_id):
print session_id, request_data
try:
browser = self.browser_instances[session_id]
print 'using existing instance'
except KeyError:
browser = Browser()
self.browser_instances[session_id] = browser
print 'new instance created'
return browser.perform(request_data)
def start_server(port=1234):
from twisted.internet import reactor
r = TestXMLRPCServer()
reactor.listenTCP(port, server.Site(r))
reactor.run()
if __name__ == '__main__':
start_server()
そして、機能をテストするためだけのクライアント:
# -*- coding: utf-8 -*-
import xmlrpclib
def test_server(port=1234):
s = xmlrpclib.Server('http://localhost:{0}/'.format(port))
session_id = 'a1b2c3d4e5f6'
html = s.open({'url': 'http://www.google.com'}, session_id)
print unicode(html).encode('utf-8')
html = s.open({'url': 'http://www.ubuntu.com'}, session_id)
print unicode(html).encode('utf-8')
session_id = 'f6e5d4c3b2a1'
html = s.open({'url': 'http://www.yahoo.com'}, session_id)
print unicode(html).encode('utf-8')
if __name__ == '__main__':
test_server()