21

ループを実行しているコンピューターに Arduino を接続し、100 ミリ秒ごとにシリアル ポート経由でコンピューターに値を送信しています。

シリアルポートから数秒ごとにのみ読み取る Python スクリプトを作成したいので、Arduino から送信された最後のものだけを表示したいと考えています。

Pyserialでこれをどのように行いますか?

これが私が試したコードで、動作しません。行を順番に読み取ります。

import serial
import time

ser = serial.Serial('com4',9600,timeout=1)
while 1:
    time.sleep(10)
    print ser.readline() #How do I get the most recent line sent from the device?
4

10 に答える 10

28

おそらく私はあなたの質問を誤解していますが、それはシリアルラインであるため、Arduinoから送信されたすべてのものを順番に読み取る必要があります-読み取るまでArduinoにバッファリングされます.

送信された最新のものを示すステータス表示が必要な場合は、質問にコードを組み込んだスレッドを使用し(スリープを差し引いたもの)、最後の完全な行をArduinoからの最新の行として読み取ったままにします。

更新: mtasicのサンプル コードは非常に優れていinWaiting()ますが、が呼び出されたときに Arduino が部分的な行を送信した場合、切り捨てられた行が表示されます。代わりに、最後の完全な行をに入れlast_received、部分的な行を保持しbufferて、次回のループに追加できるようにする必要があります。このようなもの:

def receiving(ser):
    global last_received

    buffer_string = ''
    while True:
        buffer_string = buffer_string + ser.read(ser.inWaiting())
        if '\n' in buffer_string:
            lines = buffer_string.split('\n') # Guaranteed to have at least 2 entries
            last_received = lines[-2]
            #If the Arduino sends lots of empty lines, you'll lose the
            #last filled line, so you could make the above statement conditional
            #like so: if lines[-2]: last_received = lines[-2]
            buffer_string = lines[-1]

の使用についてreadline(): Pyserial のドキュメントに記載されている内容は次のとおりです (明確にするために少し編集し、readlines() に言及しています)。

「readline」を使用するときは注意してください。シリアルポートを開くときはタイムアウトを指定してください。そうしないと、改行文字が受信されない場合に永久にブロックされる可能性があります。また、「readlines()」はタイムアウトでのみ機能することに注意してください。タイムアウトがあるかどうかに依存し、それを EOF (ファイルの終わり) として解釈します。

これは私にはかなり理にかなっているように思えます。

于 2009-07-07T17:28:01.933 に答える
11
from serial import *
from threading import Thread

last_received = ''

def receiving(ser):
    global last_received
    buffer = ''

    while True:
        # last_received = ser.readline()
        buffer += ser.read(ser.inWaiting())
        if '\n' in buffer:
            last_received, buffer = buffer.split('\n')[-2:]

if __name__ ==  '__main__':
    ser = Serial(
        port=None,
        baudrate=9600,
        bytesize=EIGHTBITS,
        parity=PARITY_NONE,
        stopbits=STOPBITS_ONE,
        timeout=0.1,
        xonxoff=0,
        rtscts=0,
        interCharTimeout=None
    )

    Thread(target=receiving, args=(ser,)).start()
于 2009-07-07T17:27:56.020 に答える
7

これらのソリューションは、文字を待っている間、CPUを占有します。

read(1)に対して少なくとも1回のブロッキング呼び出しを行う必要があります

while True:
    if '\n' in buffer: 
        pass # skip if a line already in buffer
    else:
        buffer += ser.read(1)  # this will block until one more char or timeout
    buffer += ser.read(ser.inWaiting()) # get remaining buffered chars

...そして前と同じように分割します。

于 2010-10-22T18:53:23.767 に答える
3

この方法では、各行のすべてのデータを収集するためのタイムアウトと、追加の行を待機するための別のタイムアウトを個別に制御できます。

# get the last line from serial port
lines = serial_com()
lines[-1]              

def serial_com():
    '''Serial communications: get a response'''

    # open serial port
    try:
        serial_port = serial.Serial(com_port, baudrate=115200, timeout=1)
    except serial.SerialException as e:
        print("could not open serial port '{}': {}".format(com_port, e))

    # read response from serial port
    lines = []
    while True:
        line = serial_port.readline()
        lines.append(line.decode('utf-8').rstrip())

        # wait for new data after each line
        timeout = time.time() + 0.1
        while not serial_port.inWaiting() and timeout > time.time():
            pass
        if not serial_port.inWaiting():
            break 

    #close the serial port
    serial_port.close()   
    return lines
于 2013-03-22T13:56:34.560 に答える
2

.inWaiting()無限ループ内で使用すると問題が発生する場合があります。実装によっては、CPU全体を占有する場合があります。代わりに、特定のサイズのデータ​​を使用して読み取ることをお勧めします。したがって、この場合、たとえば次のようにする必要があります。

ser.read(1024)
于 2011-06-09T22:59:22.520 に答える
2

mtasic と Vinay Sajip のコードを少し変更:

このコードは、同様のアプリケーションで非常に役立つことがわかりましたが、定期的に情報を送信するシリアル デバイスから戻ってくるすべての回線が必要でした。

最初の要素を先頭からポップして記録し、残りの要素を新しいバッファーとして再結合して、そこから続行することにしました。

これはグレッグが求めていたものではないことはわかっていますが、補足として共有する価値があると思いました.

def receiving(ser):
    global last_received

    buffer = ''
    while True:
        buffer = buffer + ser.read(ser.inWaiting())
        if '\n' in buffer:
            lines = buffer.split('\n')
            last_received = lines.pop(0)

            buffer = '\n'.join(lines)
于 2009-07-17T18:40:17.530 に答える
2

readline() への最後の呼び出しがタイムアウトになるまでブロックして、送信されたすべてを読み取るループが必要になります。そう:

def readLastLine(ser):
    last_data=''
    while True:
        data=ser.readline()
        if data!='':
            last_data=data
        else:
            return last_data
于 2009-07-07T17:44:05.583 に答える