6

Linux でシリアル ポートへのアクセスを多重化しようとしています。シリアル ポートが 1 つしかない組み込みシステムを使用していますが、複数のプロセスが通信できると便利です。

一般的な使用例は次のとおりです。

  • テストを実行する 1 つのメイン プログラム (コマンドの送信と出力の受信)。
  • すべてのシリアル ポート アクティビティを別のログに記録します。
  • テスト中に何らかのエラーが発生した後、追加のコマンドを送信したり、事後分析を実行したりするためにユーザー端末が開きます。

まず、n 個の疑似端末ペア (およびシリアル ポート) を開く単純な python スクリプトを作成し、poll ステートメントを使用して適切な場所に入出力を送信します。

# Removed boiler plate and error checking for clarity

##### Serial port setup
ttyS = serial.Serial(device, baudrate, width, parity, stopbits, 1, xon, rtc)
ttyS.setTimeout(0) # Non-blocking

##### PTYs setup
pts = []
for n in range(number_of_slave_terminals):
    master, slave = os.openpty()
    # Print slave names so others know where to connect
    print >>sys.stderr, 'MUX > fd: %d pty: %s' % (slave, os.ttyname(slave))
    pts.append(master)

##### Poller setup
poller = select.poll()
poller.register(ttyS.fd, select.POLLIN | select.POLLPRI)
for pt in pts:
    poller.register(pt, select.POLLIN | select.POLLPRI)

##### MAIN
while True:

events = poller.poll(500)

for fd, flag in events:

    # fd has input
    if flag & (select.POLLIN | select.POLLPRI):
        # Data on serial
        if fd == ttyS.fd:
            data = ttyS.read(80)
            for pt in pts:
                os.write(pt, data)

        # Data on other pty
        else:
            ttyS.write(os.read(fd, 80))

このアプローチは、すべての pty が接続されている場合に非常にうまく機能します。接続されていない pty がある場合、最終的にそのバッファーがいっぱいになり、書き込み時にブロックされます。どのスレーブが接続されているか、または何らかのオンデマンドの pty を開くかを知る必要があるようです。

私はこの質問で巧妙なトリックを見つけました。その上で、男はシリアルポート部分からの読み取りだけを行う必要があるため、スクリプトを調整しました:

##### Serial port setup
ttyS = serial.Serial(device, baudrate, width, parity, stopbits, 1, xon, rtc)
ttyS.setTimeout(0) # Non-blocking

##### PTYs setup
pts = []
for n in range(number_of_slave_terminals):
    master, slave = os.openpty()

    # slaves
    print >>sys.stderr, 'MUX > fd: %d pty: %s' % (slave, os.ttyname(slave))
    os.close(slave) # POLLHUP trick

    # masters
    pts.append(master)

##### Poller setup
reader = select.poll()
writer = select.poll()

reader.register(ttyS, select.POLLIN | select.POLLPRI)
for pt in pts:
    reader.register(pt, select.POLLIN | select.POLLPRI)
    writer.register(pt, select.POLLIN | select.POLLPRI | select.POLLOUT)

def write_to_ptys(data):
    events = writer.poll(500)

    for fd, flag in events:

        # There is someone on the other side...
        if not (flag & select.POLLHUP):
            os.write(fd, data)

##### MAIN
while True:
    events = reader.poll(500)

    for fd, flag in events:

    if flag & (select.POLLIN | select.POLLPRI):
        # Data on serial
        if fd == ttyS.fd:
            write_to_tty(ttyS.read(80))
        # Data on other pty
        else:
            ttyS.write(os.read(fd, 80))

これは機能しますが、リーダーのポーリングに POLLHUP イベントが殺到するため、CPU を 100% 使用します。

疑似端末の代わりに TCP ソケットを使用すれば、必要なものが得られると思います。欠点は、ソケットを使用するために、端末で既に動作している他のすべてのスクリプトを変更する必要があることです (socat を使用できることはわかっていますが、もっと単純なものが必要なだけです)。その上、ネットワークのオーバーヘッドがすべてあります...

それで、何かアイデアはありますか?

セットアップが簡単である限り、他のツールを使用してもかまいません。他の言語を使用することも気にしません。Python が一番好きです。

4

1 に答える 1

6

最後に、やりたくないと言ったのと同じように、単純なTCPサーバーを書きました...しかし、それは本当にうまく機能します。質問のコードと同じ一般的なアーキテクチャを使用しますが、疑似端末の代わりに TCP ソケットを使用します。

誰かがそれを使用したい場合に備えて、ここに投稿しました。

それを呼び出すには:

md:mux_serial> ./mux_server.py --device /dev/ttyS0 --baud 115200 --port 23200
MUX > Serial port: /dev/ttyS0 @ 115200
MUX > Server: localhost:23200

socatポートに直接アクセスするために別の端末で使用しています...

md:~> socat -,raw,echo=0,escape=0x0f TCP4:localhost:23200

...またはそれらを必要とするスクリプト内で使用する疑似端末を作成するには:

md:~> socat -d -d pty,raw,echo=0 TCP4:localhost:23200
2012/10/01 13:08:21 socat[3798] N PTY is /dev/pts/4
2012/10/01 13:08:21 socat[3798] N opening connection to AF=2 127.0.0.1:23200
2012/10/01 13:08:21 socat[3798] N successfully connected from local address AF=2 127.0.0.1:35138
2012/10/01 13:08:21 socat[3798] N starting data transfer loop with FDs [3,3] and [5,5]
于 2012-10-01T16:08:58.287 に答える