python3 虚拟串口实现与测试

注意

以下代码仅适用于Unix系统,windows系统不支持。

虚拟串口

作用:打开两个虚拟串口,当接收到数据时,将16进制数据原模原样返回给串口缓冲区

#!/usr/bin/env python
# coding=utf-8

import pty
import os
import select
import binascii
import time


def mkpty():

    master1, slave = pty.openpty()
    slaveName1 = os.ttyname(slave)
    master2, slave = pty.openpty()
    slaveName2 = os.ttyname(slave)

    print('\nslave device names:', slaveName1, slaveName2)
    return master1, master2


if __name__ == "__main__":

        master1, master2 = mkpty()
        print("---------------------------")
        while True:
            # rl=read list, wait until ready to reading
            # wl=write list, wait until ready to writing
            # el=exception list, wait for an "exceptional condition"
            # timeout = 1s

            rl, wl, el = select.select([master1, master2], [], [], 1)
            for device in rl:
                data = os.read(device, 128)
                print(time.strftime("%Y-%m-%d,%H:%M:%S", time.localtime()))
                print("read %d data."%len(data))
                print(str(binascii.b2a_hex(data))[2:-1])
                if device == master1:
                    print("正在回传pts0")
                    os.write(master1, data)
                    print("---------------------------")
                else:
                    print("正在回传pts1")
                    os.write(master2, data)

测试用例

import serial
import time
import binascii
import threading

class test_serial:
    def __init__(self,):
        self.portx = "/dev/pts/0"
        self.bps = 115200
        self.timex = 5
        self.ser = None
        self.ser = self.open_serial()

    def open_serial(self,):
        """

        :return: 串口实例化
        """
        try:
            ser = serial.Serial(self.portx, self.bps, timeout=self.timex)
        except Exception as e:
            print(f'Error. Can not open the serial {e}')
            time.sleep(5)
            ser = self.open_serial()

        return ser

    @staticmethod
    def xor_check(response,):
        XOR_status = int('00', 16)
        response_len = len(response)
        # print(f"数据长度: {response_len}")
        i = 0
        while i < response_len-2:
            # print(f"当前i的值: {i}")
            XOR_status = XOR_status ^ int(response[i:i+2], 16)
            i += 2
            # print(f"该步XOR的值: {XOR_status}")
        # print(XOR_status)
        # print(int(response[response_len-2:response_len], 16))
        if XOR_status == int(response[response_len-2:response_len], 16):
            return True
        else:
            return False

    def serial_response_check(self,):
        """

        :return:
        """
        # ser = open_serial()
        while True:
            tmp = ""
            ser_size = self.ser.inWaiting()
            if ser_size >= 2:
                ser_top = self.ser.read(2)  # 获取一组data的top
                ser_top = str(binascii.b2a_hex(ser_top))[2:-1]
                if ser_top == "55aa":  # 检查是否为一组数据的头
                    tmp = tmp + ser_top

                    ser_cnt = self.ser.read(1)  # 获取一组data的数据位长度
                    ser_cnt = str(binascii.b2a_hex(ser_cnt))[2:-1]
                    tmp = tmp + ser_cnt

                    ser_response = self.ser.read(int(ser_cnt)+2)  # 获取顺序位+数据+校验位
                    ser_response = str(binascii.b2a_hex(ser_response))[2:-1]
                    tmp = tmp + ser_response
                    print(tmp, len(tmp))

                    XOR_check = self.xor_check(tmp)  # XOR校验
                    print(XOR_check)
                    if XOR_check:
                        serial_data = tmp[8:8+int(ser_cnt)*2]
                        print(f"该数据值为: {serial_data}")


if __name__ == '__main__':

    test_class = test_serial()
    ttyUSB_ser = test_class.open_serial()  # 全局串口变量
    try:
        threading.Thread(target=test_class.serial_response_check).start()
        time.sleep(1)
        while True:
            ttyUSB_ser.write(bytes.fromhex("55 AA 02 00 0A 01 F6"))

            # time.sleep(2)

            ttyUSB_ser.write(bytes.fromhex("55 AA 02 00 0A 00 F7"))
            ttyUSB_ser.write(bytes.fromhex("55 AA 04 00 C5 01 39 05 03"))

    except KeyboardInterrupt:
        ttyUSB_ser.close()

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐