2.14 haas506开发教程-高级组件库-tcp/usocket

1.tcp测试用例

  • 网络测试工具
    链接: 网络测试工具.
    在这里插入图片描述
    点击链接进入主界面后,点击"打开TCP",左上角会出现host:port,复制到main.py中的host、port处。
    在这里插入图片描述

  • main.py

代码主要功能:连接网络、网络校时获取实时时间、socket连接、socket发送/接收数据
修改host和port处的值,烧录代码。

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@File       :    main.py
@Description:    
@Author     :    
@date       :   
@version    :    1.0
'''
import utime as time
import network
import usocket
import _thread  
import sntp     
import net as mynet

g_connect_status = False
g_tcp=False

global sock
host = "112.125.89.8"
port = 37496

def on_4g_cb(args):
    global g_connect_status
    pdp = args[0]
    netwk_sta = args[1]
    if netwk_sta == 1:
        g_connect_status = True
    else:
        g_connect_status = False

def connect_network():
    global net,on_4g_cb,g_connect_status
    net = network.NetWorkClient()
    g_register_network = False
    if net._stagecode is not None and net._stagecode == 3 and net._subcode == 1:
        g_register_network = True
    else:
        g_register_network = False
    if g_register_network:
        net.on(1,on_4g_cb)
        net.connect(None)
    else:
        print('network register failed')
    while True:
        if g_connect_status:
            print('network register successed')
            break
        time.sleep_ms(20)

def tcpsend(s):
    global g_tcp
    while True:
        try:
            print('tcp send gateway heartbeat...')
            ret=s.send("heartbeatbuf")
            time.sleep(5)
            g_tcp=True
        except OSError:
            g_tcp=False
            print('TCP OSError')
            break
        except:
            g_tcp=False
            break

def doConnect(host,port):
    global sock
    sock = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM)
    try :         
        sock.connect((host,port))
    except :
        pass
    return sock

def tcprecv(s):
    print('Running thread tcprecv...')
    while True:
        try:
            #接收信息并打印
            recvdata=s.recv(255)
        except OSError:
            print('TCP OSError')
        if len(recvdata)!=0:
            print(recvdata)
        time.sleep(1)

def main():   
    global sockLocal
    global sock
    print(host,port)   
    sockLocal = doConnect(host,port)
    _thread.start_new_thread(tcprecv, (sockLocal,))  
    while True:
        try :
            #获取当前时间
            #获取到的时间格式是 (年,月,日,时,分,秒,周日,年日)
            t=time.localtime()
            #按照一定格式输出时间
            t_time="{:04d}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}".format(t[0],t[1],t[2],t[3],t[4],t[5])
            #将实时时间戳发送过去
            msg = str(t_time)
            sockLocal.send(msg) 
            print("send msg ok : ",msg)               
            # print("recv data :",sockLocal.recv(1024))
        except:
            print("\r\nsocket error,do reconnect ")
            time.sleep(3)
            sock.close()
            sockLocal = doConnect(host,port)   
        time.sleep(2)

if __name__ == '__main__':
    connect_network()#连接网络
    #校时
    sntp.settime()    
    time.sleep(1)
    print("csq:",mynet.csqQueryPoll()) 
    main()      

            
    
  • board.json
{
    "name": "haas506",
   "version": "1.0.0",
   "io": {            
      "serial1":{
        "type":"UART",
        "port":0,
        "dataWidth":8,
        "baudRate":115200,
        "stopBits":1,
        "flowControl":"disable",
        "parity":"none"
      },
      "serial2":{
        "type":"UART",
        "port":1,
        "dataWidth":8,
        "baudRate":115200,
        "stopBits":1,
        "flowControl":"disable",
        "parity":"none"
      },
      "serial3":{
        "type":"UART",
        "port":2,
        "dataWidth":8,
        "baudRate":115200,
        "stopBits":1,
        "flowControl":"disable",
        "parity":"none"
      }
   },
   "debugLevel": "ERROR",
   "repl":"enable",
   "replPort":0
  
  }
   

2.测试结果

  • 连接成功后,将时间戳发送过去
  • 利用网络测试工具发送字符串数据、HEX数据
  • 串口助手可以看到测试工具发过来的数据和被发送的时间戳数据
    在这里插入图片描述
    注意:网络测试工具一旦断开TCP连接,它的host:port数据会改变。
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐