1

TwistedAMPの使い方を学んでいます。クライアントからサーバーにデータを送信し、SQLite3DBにデータを挿入するプログラムを開発しています。次に、サーバーは成功またはエラーを示す結果をクライアントに送り返します(これを行うには最善の方法ではないかもしれませんが、主な問題を解決するための一時的な解決策にすぎません)。これを行うために、元々合計を行って結果を返した例を変更したので、これが私がやろうとしていることを行うための最も効率的な方法ではない可能性があることに気付きました。特に、複数の挿入でいくつかのタイミングを実行しようとしています(つまり、複数の挿入のためにデータをサーバーに複数回送信します)。作成したコードを含めました。

ClientCreatorをreactor.callWhenRunning()に渡すなど、これを回避するいくつかの方法を試しましたが、延期してこれを行うことはできません。

これを行う方法についての提案、アドバイス、またはヘルプをいただければ幸いです。これがコードです。

サーバ:

from twisted.protocols import amp
from twisted.internet import reactor
from twisted.internet.protocol import Factory
import sqlite3, time

class Insert(amp.Command):
    arguments = [('data', amp.Integer())]
    response = [('insert_result', amp.Integer())]

class Protocol(amp.AMP):
    def __init__(self):     
       self.conn = sqlite3.connect('biomed1.db')
       self.c =self.conn.cursor()
       self.res=None

    @Insert.responder
    def dbInsert(self, data):
        self.InsertDB(data) #call the DB inserter
        result=self.res     # send back the result of the insertion
        return {'insert_result': result}

    def InsertDB(self,data):
      tm=time.time()
      print "insert time:",tm
      chx=data
      PID=2
      device_ID=5
      try:
        self.c.execute("INSERT INTO btdata4(co2_data, patient_Id, sensor_Id) VALUES ('%s','%s','%s')" % (chx, PID, device_ID))    
      except Exception, err:
             print err
             self.res=0
      else:
             self.res=1

      self.conn.commit()


pf = Factory()
pf.protocol = Protocol
reactor.listenTCP(1234, pf) 
reactor.run()

クライアント:

from twisted.internet import reactor
from twisted.internet.protocol import ClientCreator
from twisted.protocols import amp
import time

class Insert(amp.Command):
    arguments = [('data', amp.Integer())]
    response = [('insert_result', amp.Integer())]

def connected(protocol):
    return protocol.callRemote(Insert, data=5555).addCallback(gotResult)

def gotResult(result):
    print 'insert_result:', result['insert_result']
    tm=time.time()
    print "stop", tm    

def error(reason):
    print "error", reason

tm=time.time()
print "start",tm
for i in range (10): #send data over ten times
  ClientCreator(reactor, amp.AMP).connectTCP(
     '127.0.0.1', 1234).addCallback(connected).addErrback(error)

reactor.run()

コードの終わり。

ありがとうございました。

4

1 に答える 1

4

サーバーコードを改善するものはほとんどありません。

何よりもまず、データベースへの直接アクセス機能の使用は、通常はブロックを引き起こすため、ねじれた状態ではお勧めできません。Twistedは、データベースアクセスのための優れた抽象化を備えており、データベース接続へのツイストアプローチを提供します-twisted.adbapi

次に、db接続の再利用について説明します。特定のアセット(データベース接続など)を複数のプロトコルインスタンスで再利用する場合は、Factoryのコンストラクターでそれらを初期化する必要があります。起動時にそのようなものを開始したくない場合は、リソースアクセスメソッド。最初のメソッド呼び出しでリソースを開始し、それをクラス変数に割り当てて、後続の呼び出しでそれを返します。

Factoryが特定のプロトコルインスタンスを作成すると、プロトコル内にそれ自体への参照が追加されます。twisted.internet.protocolの97行目を参照してください。

次に、Protocolインスタンス内で、次のような共有データベース接続インスタンスにアクセスできます。

self.factory.whatever_name_for_db_connection.doSomething() 

作り直されたサーバーコード(私はPython、ツイスト、またはまともなIDEを利用できないので、これはほとんどテストされていません。いくつかのエラーが予想されます)

from twisted.protocols import amp
from twisted.internet import reactor
from twisted.internet.protocol import Factory
import time

class AMPDBAccessProtocolFactory(Factory):
    def getDBConnection(self):
        if 'dbConnection' in dir(self):
            return self.dbConnection
        else:
            self.dbConnection = SQLLiteTestConnection(self.dbURL)
            return self.dbConnection

class SQLLiteTestConnection(object):
    """
    Provides abstraction for database access and some business functions.
    """
    def __init__(self,dbURL):
        self.dbPool =  adbapi.ConnectionPool("sqlite3" , dbURL,  check_same_thread=False)

    def insertBTData4(self,data):
        query = "INSERT INTO btdata4(co2_data, patient_Id, sensor_Id) VALUES (%s,%s,%s)" 
        tm=time.time()
        print "insert time:",tm
        chx=data
        PID=2
        device_ID=5
        dF = self.dbPool.runQuery(query,(chx, PID, device_ID)) 
        dF.addCallback(self.onQuerySuccess,insert_data=data)
        return dF
    def onQuerySuccess(self,insert_data,*r):
        """
        Here you can inspect query results or add any other valuable information to be parsed at client.
        For the test sake we will just return True to a customer if query was a success.
        original data available at kw argument insert_data
        """
        return True


class Insert(amp.Command):
    arguments = [('data', amp.Integer())]
    response = [('insert_result', amp.Integer())]

class MyAMPProtocol(amp.AMP):

    @Insert.responder
    def dbInsert(self, data):
        db = self.factory.getDBConnection()
        dF = db.insertBTData4(data)
        dF.addErrback(self.onInsertError,data)
        return dF

    def onInsertError(self, error, data):
        """
        Here you could do some additional error checking or inspect data 
        which was handed for insert here. For now we will just throw the same exception again
        so that the client gets notified
        """
        raise error

if __name__=='__main__':
    pf = AMPDBAccessProtocolFactory()
    pf.protocol = MyAMPProtocol
    pf.dbURL='biomed1.db'
    reactor.listenTCP(1234, pf) 
    reactor.run()

次に、クライアントに移ります。AMPが全体的なRPCロジックに従っている場合(現在はテストできません)、複数の呼び出しにわたって同じ接続を熟読できるはずです。そこで、その閲覧可能なプロトコルインスタンスを保持し、呼び出しの抽象化を提供するServerProxyクラスを作成しました。

from twisted.internet import reactor
from twisted.internet.protocol import ClientCreator
from twisted.protocols import amp
import time

class Insert(amp.Command):
    arguments = [('data', amp.Integer())]
    response = [('insert_result', amp.Integer())]

class ServerProxy(object):
    def connected(self,protocol):
        self.serverProxy = protocol # assign protocol as instance variable
        reactor.callLater(5,self.startMultipleInsert) #after five seconds start multiple insert procedure

    def remote_insert(self,data):
        return self.serverProxy.callRemote(Insert, data)

    def startMultipleInsert(self):
        for i in range (10): #send data over ten times
            dF = self.remote_insert(i)
            dF.addCallback(self.gotInsertResult)
            dF.addErrback(error)

    def gotInsertResult(self,result):
        print 'insert_result:', str(result)
        tm=time.time()
        print "stop", tm    

def error(reason):
    print "error", reason


def main():
    tm=time.time()
    print "start",tm
    serverProxy = ServerProxy()
    ClientCreator(reactor, amp.AMP).connectTCP('127.0.0.1', 1234).addCallback(serverProxy.connected).addErrback(error)
    reactor.run()    

if __name__=='__main__':
    main()
于 2012-04-25T20:31:11.670 に答える