DDR爱好者之家 Design By 杰米
这个属于我项目中一个函数,跟大家分享一下我的思路及最终实现
在编写串口通信工具中,需要实现一个函数,自动找到对应com 口,并且连接该com口,保证后续通信正常
作为初始化过程的一部分。
思路
在win 设备管理器中,经常会出现多个com 口,但并不是每个com 口都是目标设备所链接的。
尝试打开每个com 口,输入enter 按键, 正确的com 口,会有ack log 返回,表明通信 正常
否则,没有任何log 返回,则判断为非目标设备所连接的com 口。
实现
尝试去打开所有com 口,然后发送enter, 如果在一段时间内有返回值,检查com 口收到的字节数,如果非零,则表明找到了对应的com 口。
完整测试代码如下:
import serial
import serial.tools.list_ports
import threading
import binascii
import time
from datetime import datetime
# default value
baunRate = 115200
is_read = False
is_write = False
write_buff = []
sys_buff = []
mSerial = None
callback = None
is_opened = 0
is_registed = 0
class SerialPort:
def __init__(self,port,buand):
self.port = serial.Serial(port,buand)
self.port.close()
if not self.port.isOpen():
self.port.open()
#the index of data_bytes for read operation,私有属性
#only used in read lines
self.__read_ptr = 0
self.__read_head = 0
#store all read bytes
# used in read date, read lines
self.__data_bytes = bytearray()
def port_open(self):
if not self.port.isOpen():
self.port.open()
def port_close(self):
self.port.close()
def send(self):
global is_write
global write_buff
while is_write:
if len(write_buff):
msg = write_buff.pop(0)
msg = msg+"\n"
cmd = msg.encode()
try:
self.port.write(cmd)
except:
write_buff.clear()
is_write = False
write_buff.clear()
def read_data(self):
global is_read
global is_opened
byte_cnt = 0
while is_read:
try:
count = self.port.inWaiting()
if count > 0:
rec_str = self.port.read(count)
self.__data_bytes = self.__data_bytes+rec_str
#print("receive:",rec_str.decode())
#print(rec_str)
byte_cnt += count
if not is_opened:
is_opened = 1
#print("累计收到:",byte_cnt)
#time.sleep(0.5)
self.read_lines()
except:
deinit()
#将当前所有的数据都读出,读取位置不变,每次读取指针依次移动,不漏数据, 读取行为一直在进行
def read_lines(self):
#reset
line_cnt = 0
data_len = len(self.__data_bytes)
#print ("")
#print ("begin: prt=:", self.__read_ptr, " head =", self.__read_head,"current len =",data_len)
if self.__read_ptr >=data_len:
return
#get all lines in current data_bytes
while(self.__read_ptr < data_len-1):
if(self.__data_bytes[self.__read_ptr+1] == 0x0a and self.__data_bytes[self.__read_ptr] == 0x0d):
tmp = bytearray()
tmp = self.__data_bytes[self.__read_head:self.__read_ptr]
try:
line = tmp.decode()
except:
self.__read_head = self.__read_ptr + 2
self.__read_ptr = self.__read_head
continue
iprint(line)
line_cnt += 1
self.__read_head = self.__read_ptr + 2
self.__read_ptr = self.__read_head
else:
self.__read_ptr = self.__read_ptr + 1
def auto_open_serial():
global baunRate
global mSerial
global callback
global is_registed
global is_opened
#reset
deinit()
# 列出所有当前的com口
port_list = list(serial.tools.list_ports.comports())
port_list_name = []
#get all com
if len(port_list) <= 0:
iprint("the serial port can't find!")
return False
else:
for itms in port_list:
port_list_name.append(itms.device)
#try open
#print(port_list_name)
for i in port_list_name:
try:
mSerial = SerialPort(i,baunRate)
iprint("try open %s"%i)
start_task()
send("")
#return True
time.sleep(1)
if is_opened:
iprint("connect %s successfully"%i)
return True
else:
deinit()
if i == port_list_name[len(port_list_name)-1]:
iprint("uart don't open")
break
continue
except:
iprint(" uart don't open")
deinit()
return False
def deinit():
global mSerial
global is_write
global is_read
global write_buff
global is_opened
if mSerial:
mSerial.port_close()
is_opened = 0
is_read = False
is_write = False
write_buff = []
mSerial = None
time.sleep(1)
def init():
global mSerial
global callback
global is_registed
global is_opened
global is_read
global is_write
#retry
retry_time = 0
while not auto_open_serial():
if not is_opened:
iprint("wait for uart connect, retry %s"%str(retry_time))
else:
return True
retry_time += 1
time.sleep(2)
if retry_time == 10:
iprint(" open uart fail")
return False
def send(msg):
global mSerial
global is_write
global write_buff
if is_write:
write_buff.append(msg)
def start_task():
global mSerial
global is_write
global is_read
if mSerial:
is_write = True
t1 = threading.Thread (target=mSerial.send)
t1.setDaemon (False)
t1.start ()
is_read = True
t2 = threading.Thread (target=mSerial.read_data)
t2.setDaemon (False)
t2.start ()
def iprint(msg):
global callback
global is_registed
msg = "[Uart] "+str(msg)
if is_registed:
callback.append(msg)
else:
print(msg)
def start_sys_cmd():
global is_registed
if is_registed:
t3 = threading.Thread (target=process_receive_sys_cmd)
t3.setDaemon (False)
t3.start()
def process_receive_sys_cmd():
global sys_buff
global is_registed
global callback
#print("process_receive_sys_cmd")
while is_registed:
#print ("wait,process_receive_sys_cmd")
if len(sys_buff):
#print ("receive,process_receive_sys_cmd")
line = sys_buff.pop(0)
if "init" in line:
if is_opened and is_read and is_write:
iprint("already open uart")
break
iprint("start init")
init()
if is_opened:
break
iprint("Eixt uart sys thread")
def register_cback(list):
global callback
global is_registed
callback = list
is_registed = 1
def unregister_cback():
global callback
callback.clear()
if __name__ == '__main__':
receive = []
register_cback(receive)
sys_buff.append("init")
start_sys_cmd()
def process_receive_msg():
global receive
while True:
#print("wait")
if len(receive):
#print("receive")
print(receive.pop(0))
t = threading.Thread(target=process_receive_msg)
t.setDaemon(False)
t.start()
DDR爱好者之家 Design By 杰米
广告合作:本站广告合作请联系QQ:858582 申请时备注:广告合作(否则不回)
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
DDR爱好者之家 Design By 杰米
暂无评论...
更新日志
2025年10月27日
2025年10月27日
- 小骆驼-《草原狼2(蓝光CD)》[原抓WAV+CUE]
- 群星《欢迎来到我身边 电影原声专辑》[320K/MP3][105.02MB]
- 群星《欢迎来到我身边 电影原声专辑》[FLAC/分轨][480.9MB]
- 雷婷《梦里蓝天HQⅡ》 2023头版限量编号低速原抓[WAV+CUE][463M]
- 群星《2024好听新歌42》AI调整音效【WAV分轨】
- 王思雨-《思念陪着鸿雁飞》WAV
- 王思雨《喜马拉雅HQ》头版限量编号[WAV+CUE]
- 李健《无时无刻》[WAV+CUE][590M]
- 陈奕迅《酝酿》[WAV分轨][502M]
- 卓依婷《化蝶》2CD[WAV+CUE][1.1G]
- 群星《吉他王(黑胶CD)》[WAV+CUE]
- 齐秦《穿乐(穿越)》[WAV+CUE]
- 发烧珍品《数位CD音响测试-动向效果(九)》【WAV+CUE】
- 邝美云《邝美云精装歌集》[DSF][1.6G]
- 吕方《爱一回伤一回》[WAV+CUE][454M]
