Linux でシリアル ポートへのアクセスを多重化しようとしています。シリアル ポートが 1 つしかない組み込みシステムを使用していますが、複数のプロセスが通信できると便利です。
一般的な使用例は次のとおりです。
- テストを実行する 1 つのメイン プログラム (コマンドの送信と出力の受信)。
- すべてのシリアル ポート アクティビティを別のログに記録します。
- テスト中に何らかのエラーが発生した後、追加のコマンドを送信したり、事後分析を実行したりするためにユーザー端末が開きます。
まず、n 個の疑似端末ペア (およびシリアル ポート) を開く単純な python スクリプトを作成し、poll ステートメントを使用して適切な場所に入出力を送信します。
# Removed boiler plate and error checking for clarity
##### Serial port setup
ttyS = serial.Serial(device, baudrate, width, parity, stopbits, 1, xon, rtc)
ttyS.setTimeout(0) # Non-blocking
##### PTYs setup
pts = []
for n in range(number_of_slave_terminals):
master, slave = os.openpty()
# Print slave names so others know where to connect
print >>sys.stderr, 'MUX > fd: %d pty: %s' % (slave, os.ttyname(slave))
pts.append(master)
##### Poller setup
poller = select.poll()
poller.register(ttyS.fd, select.POLLIN | select.POLLPRI)
for pt in pts:
poller.register(pt, select.POLLIN | select.POLLPRI)
##### MAIN
while True:
events = poller.poll(500)
for fd, flag in events:
# fd has input
if flag & (select.POLLIN | select.POLLPRI):
# Data on serial
if fd == ttyS.fd:
data = ttyS.read(80)
for pt in pts:
os.write(pt, data)
# Data on other pty
else:
ttyS.write(os.read(fd, 80))
このアプローチは、すべての pty が接続されている場合に非常にうまく機能します。接続されていない pty がある場合、最終的にそのバッファーがいっぱいになり、書き込み時にブロックされます。どのスレーブが接続されているか、または何らかのオンデマンドの pty を開くかを知る必要があるようです。
私はこの質問で巧妙なトリックを見つけました。その上で、男はシリアルポート部分からの読み取りだけを行う必要があるため、スクリプトを調整しました:
##### Serial port setup
ttyS = serial.Serial(device, baudrate, width, parity, stopbits, 1, xon, rtc)
ttyS.setTimeout(0) # Non-blocking
##### PTYs setup
pts = []
for n in range(number_of_slave_terminals):
master, slave = os.openpty()
# slaves
print >>sys.stderr, 'MUX > fd: %d pty: %s' % (slave, os.ttyname(slave))
os.close(slave) # POLLHUP trick
# masters
pts.append(master)
##### Poller setup
reader = select.poll()
writer = select.poll()
reader.register(ttyS, select.POLLIN | select.POLLPRI)
for pt in pts:
reader.register(pt, select.POLLIN | select.POLLPRI)
writer.register(pt, select.POLLIN | select.POLLPRI | select.POLLOUT)
def write_to_ptys(data):
events = writer.poll(500)
for fd, flag in events:
# There is someone on the other side...
if not (flag & select.POLLHUP):
os.write(fd, data)
##### MAIN
while True:
events = reader.poll(500)
for fd, flag in events:
if flag & (select.POLLIN | select.POLLPRI):
# Data on serial
if fd == ttyS.fd:
write_to_tty(ttyS.read(80))
# Data on other pty
else:
ttyS.write(os.read(fd, 80))
これは機能しますが、リーダーのポーリングに POLLHUP イベントが殺到するため、CPU を 100% 使用します。
疑似端末の代わりに TCP ソケットを使用すれば、必要なものが得られると思います。欠点は、ソケットを使用するために、端末で既に動作している他のすべてのスクリプトを変更する必要があることです (socat を使用できることはわかっていますが、もっと単純なものが必要なだけです)。その上、ネットワークのオーバーヘッドがすべてあります...
それで、何かアイデアはありますか?
セットアップが簡単である限り、他のツールを使用してもかまいません。他の言語を使用することも気にしません。Python が一番好きです。