1

こんにちは私はツイストjsonrpcサーバーへのrpc呼び出しを行ういくつかのマイクロコントローラーにサービスを提供するツイストに基づくrpcサーバーの開発に取り組んでいます。ただし、アプリケーションではサーバーがいつでも各マイクロに情報を送信する必要があるため、マイクロからのリモートjsonrpc呼び出しからの応答が、ユーザー。

私が今持っている結果は、マイクロが悪い情報を受け取っているということです。なぜなら、ソケットから来ているnetstring / json文字列が以前の要件からの応答なのか、サーバーからの新しい要求なのかわからないからです。

これが私のコードです:

from twisted.internet import reactor
from txjsonrpc.netstring import jsonrpc
import weakref

creds  = {'user1':'pass1','user2':'pass2','user3':'pass3'}

class arduinoRPC(jsonrpc.JSONRPC):
    def connectionMade(self):
        pass

    def jsonrpc_identify(self,username,password,mac):
        """ Each client must be authenticated just after to be connected calling this rpc """
        if creds.has_key(username):
            if creds[username] == password:
                authenticated = True
            else:
                authenticated = False
        else:
            authenticated = False

        if authenticated:
            self.factory.clients.append(self)
            self.factory.references[mac] = weakref.ref(self)
            return {'results':'Authenticated as %s'%username,'error':None}
        else:
            self.transport.loseConnection()

    def jsonrpc_sync_acq(self,data,f):
        """Save into django table data acquired from sensors and send ack to gateway"""
        if not (self in self.factory.clients):
            self.transport.loseConnection()
        print f
        return {'results':'synced %s records'%len(data),'error':'null'}

    def connectionLost(self, reason):
        """ mac address is searched and all reference to self.factory.clientes are erased """  
        for mac in self.factory.references.keys():
            if self.factory.references[mac]() == self:
                print 'Connection closed - Mac address: %s'%mac
                del self.factory.references[mac]
                self.factory.clients.remove(self)


class rpcfactory(jsonrpc.RPCFactory):
    protocol = arduinoRPC
    def __init__(self, maxLength=1024):
        self.maxLength = maxLength
        self.subHandlers = {}
        self.clients    =   []
        self.references =   {}

""" Asynchronous remote calling to micros, simulating random calling from server """
import threading,time,random,netstring,json
class asyncGatewayCalls(threading.Thread):
    def __init__(self,rpcfactory):
        threading.Thread.__init__(self)
        self.rpcfactory =   rpcfactory
        """identifiers of each micro/client connected"""
        self.remoteMacList    =   ['12:23:23:23:23:23:23','167:67:67:67:67:67:67','90:90:90:90:90:90:90']
    def run(self):
        while True:
            time.sleep(10)
            while True:
                """ call to any of three potential micros connected """ 
                mac = self.remoteMacList[random.randrange(0,len(self.remoteMacList))]
                if self.rpcfactory.references.has_key(mac):
                    print 'Calling %s'%mac
                    proto   =   self.rpcfactory.references[mac]()
                    """ requesting echo from selected micro"""
                    dataToSend  = netstring.encode(json.dumps({'method':'echo_from_micro','params':['plop']}))
                    proto.transport.write(dataToSend)
                    break

factory = rpcfactory(arduinoRPC)

"""start thread caller""" 
r=asyncGatewayCalls(factory)
r.start()

reactor.listenTCP(7080, factory)
print "Micros remote RPC server started"
reactor.run()
4

1 に答える 1

2

受信者がメッセージの解釈方法を判断できるように、各メッセージに十分な情報を追加する必要があります。要件はAMPの要件と非常によく似ているため、代わりに AMP を使用するか、AMP と同じ構造を使用してメッセージを識別できます。具体的には:

  • リクエストに特定のキーを入力します。たとえば、AMP は「_ask」を使用してリクエストを識別します。また、これらに一意の値を与え、接続の存続期間中、その要求をさらに識別します。
  • 応答では、別のキーを入力します。たとえば、AMP はこれに「_answer」を使用します。この値は、応答の対象となるリクエストの「_ask」キーの値と一致します。

このようなアプローチを使用すると、「_ask」キーまたは「_answer」キーがあるかどうかを確認するだけで、新しいリクエストを受信したか、以前のリクエストに対する応答を受信したかを判断できます。

別のトピックでは、asyncGatewayCallsクラスはスレッドベースであってはなりません。スレッドを使用する明らかな理由はなく、そうすることで、未定義の動作につながる方法で Twisted API を悪用しています。ほとんどの Twisted API は、 を呼び出したスレッドでのみ使用できますreactor.run。唯一の例外はreactor.callFromThread、他のスレッドからリアクタ スレッドにメッセージを送信するために使用できる です。 asyncGatewayCallsただし、トランスポートに書き込もうとすると、バッファが破損したり、送信されるデータに任意の遅延が発生したり、さらに悪いことが起こる可能性があります。asyncGatewayCalls代わりに、次のように書くことができます。

from twisted.internet.task import LoopingCall

class asyncGatewayCalls(object):
    def __init__(self, rpcfactory):
        self.rpcfactory = rpcfactory
        self.remoteMacList = [...]

    def run():
        self._call = LoopingCall(self._pokeMicro)
        return self._call.start(10)

    def _pokeMicro(self):
        while True:
            mac = self.remoteMacList[...]
            if mac in self.rpcfactory.references:
                proto = ...
                dataToSend = ...
                proto.transport.write(dataToSend)
                break

factory = ...
r = asyncGatewayCalls(factory)
r.run()

reactor.listenTCP(7080, factory)
reactor.run()

asyncGatewayCallsこれにより、元のクラスで意図したのと同じ動作をするシングルスレッド ソリューションが得られます。ただし、呼び出しをスケジュールするためにスレッド内のループでスリープする代わりに、reactor のスケジューリング API を使用して (繰り返し呼び出されるものをスケジュールする上位レベルの LoopingCall クラスを介して)、確実_pokeMicroに 10 秒ごとに呼び出されるようにします。

于 2010-12-08T15:08:22.393 に答える