python3 虚拟串口实现与测试
python3 虚拟串口实现与测试虚拟串口测试用例虚拟串口作用:打开两个虚拟串口,当接收到数据时,将16进制数据原模原样返回给串口缓冲区#!/usr/bin/env python# coding=utf-8import ptyimport osimport selectimport binasciiimport timedef mkpty():master1, slave = pty.openpt
·
注意
以下代码仅适用于Unix系统,windows系统不支持。
虚拟串口
作用:打开两个虚拟串口,当接收到数据时,将16进制数据原模原样返回给串口缓冲区
#!/usr/bin/env python
# coding=utf-8
import pty
import os
import select
import binascii
import time
def mkpty():
master1, slave = pty.openpty()
slaveName1 = os.ttyname(slave)
master2, slave = pty.openpty()
slaveName2 = os.ttyname(slave)
print('\nslave device names:', slaveName1, slaveName2)
return master1, master2
if __name__ == "__main__":
master1, master2 = mkpty()
print("---------------------------")
while True:
# rl=read list, wait until ready to reading
# wl=write list, wait until ready to writing
# el=exception list, wait for an "exceptional condition"
# timeout = 1s
rl, wl, el = select.select([master1, master2], [], [], 1)
for device in rl:
data = os.read(device, 128)
print(time.strftime("%Y-%m-%d,%H:%M:%S", time.localtime()))
print("read %d data."%len(data))
print(str(binascii.b2a_hex(data))[2:-1])
if device == master1:
print("正在回传pts0")
os.write(master1, data)
print("---------------------------")
else:
print("正在回传pts1")
os.write(master2, data)
测试用例
import serial
import time
import binascii
import threading
class test_serial:
def __init__(self,):
self.portx = "/dev/pts/0"
self.bps = 115200
self.timex = 5
self.ser = None
self.ser = self.open_serial()
def open_serial(self,):
"""
:return: 串口实例化
"""
try:
ser = serial.Serial(self.portx, self.bps, timeout=self.timex)
except Exception as e:
print(f'Error. Can not open the serial {e}')
time.sleep(5)
ser = self.open_serial()
return ser
@staticmethod
def xor_check(response,):
XOR_status = int('00', 16)
response_len = len(response)
# print(f"数据长度: {response_len}")
i = 0
while i < response_len-2:
# print(f"当前i的值: {i}")
XOR_status = XOR_status ^ int(response[i:i+2], 16)
i += 2
# print(f"该步XOR的值: {XOR_status}")
# print(XOR_status)
# print(int(response[response_len-2:response_len], 16))
if XOR_status == int(response[response_len-2:response_len], 16):
return True
else:
return False
def serial_response_check(self,):
"""
:return:
"""
# ser = open_serial()
while True:
tmp = ""
ser_size = self.ser.inWaiting()
if ser_size >= 2:
ser_top = self.ser.read(2) # 获取一组data的top
ser_top = str(binascii.b2a_hex(ser_top))[2:-1]
if ser_top == "55aa": # 检查是否为一组数据的头
tmp = tmp + ser_top
ser_cnt = self.ser.read(1) # 获取一组data的数据位长度
ser_cnt = str(binascii.b2a_hex(ser_cnt))[2:-1]
tmp = tmp + ser_cnt
ser_response = self.ser.read(int(ser_cnt)+2) # 获取顺序位+数据+校验位
ser_response = str(binascii.b2a_hex(ser_response))[2:-1]
tmp = tmp + ser_response
print(tmp, len(tmp))
XOR_check = self.xor_check(tmp) # XOR校验
print(XOR_check)
if XOR_check:
serial_data = tmp[8:8+int(ser_cnt)*2]
print(f"该数据值为: {serial_data}")
if __name__ == '__main__':
test_class = test_serial()
ttyUSB_ser = test_class.open_serial() # 全局串口变量
try:
threading.Thread(target=test_class.serial_response_check).start()
time.sleep(1)
while True:
ttyUSB_ser.write(bytes.fromhex("55 AA 02 00 0A 01 F6"))
# time.sleep(2)
ttyUSB_ser.write(bytes.fromhex("55 AA 02 00 0A 00 F7"))
ttyUSB_ser.write(bytes.fromhex("55 AA 04 00 C5 01 39 05 03"))
except KeyboardInterrupt:
ttyUSB_ser.close()
更多推荐
所有评论(0)