3

計測器から TCP 経由でデータを取得するための Python ベースのインターフェイスを構築しています。データ ストリームは特定のイベントとして発生し、タイミングは安定していません。これらは小さなデータ パケットであるため、簡単にするために、完全なパケットとみなします。

ソケットから得られる動作は次のとおりです。

  • イベント #1 の送信: socket.recv がイベント #1 を返す
  • イベント #2 の送信: socket.recv がイベント #2 を返す
  • イベント #3-50 をすばやく送信: socket.recv はイベント #3-30 のみを返します (27 回返す)
  • ゆっくりとイベント #51 を送信します: ソケットの戻り値.recv イベント #31
  • ゆっくりとイベント #52 を送信します: ソケットの戻り値.recv イベント #32

データが失われることはありません。しかし、明らかにどこかにバッファがあり、ソケットは古いデータを返しています。しかし、そのバッファーが空になるまで、recv を返し続けるべきではありませんか? 代わりに、パケットのバッファが構築されているにもかかわらず、新しいパケットを受信したときにのみ返されます。変!

これがコードの本質です(これはノンブロッキング用です。recvだけでブロッキングも行いました-同じ結果です)。簡単にするために、すべてのパケット再構成要素を取り除きました。慎重にソケットまでたどったので、それが原因ではないことはわかっています。

class mysocket:
    def __init__(self,ip,port):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.connect((ip,port))
        self.keepConn = True
        self.socket.setblocking(0)
        threading.Thread(target = self.rcvThread).start()
        threading.Thread(target = self.parseThread).start()

    def rcvThread(self):
        while self.keepConn:
            readable,writable,inError = select([self.socket],[self.socket],[],.1)
            if readable:
               packet = self.socket.recv(4096)
               self.recvqueue.put_nowait(packet)
            try:
               xmitmsg = self.sendqueue.get_nowait()
            except Queue.Empty:
               pass
            else:
               if writable:
                   self.socket.send(xmitmsg)

    def parseThread(self,rest = .1):
        while self.keepConn:
            try:
                output = self.recvqueue.get_nowait()
                eventnumber = struct.unpack('<H',output[:2]
                print eventnumber
            except Queue.Empty:
                sleep(rest)

ソケットがバッファ内のすべてのデータをダンプできないのはなぜですか? 絶対追いつかない!これは奇妙すぎる。誰もポインターを持っていますか?

私はアマチュアですが、これについては本当に宿題をやっていて、完全に困惑しています。

4

1 に答える 1