[Python] 纯文本查看 复制代码
import serial
import serial.tools.list_ports
import threading
import queue
import os
import time
import ttkbootstrap as ttk
from ttkbootstrap.constants import *
from ttkbootstrap.dialogs import Messagebox
from ttkbootstrap.scrolled import ScrolledText
from tkinter import BooleanVar, StringVar, IntVar
import platform
class SerialTool:
def __init__(self, master):
self.master = master
self.master.title("串口调试助手 v1.0---开发:DIY爱好者")
self.master.geometry("900x520")
self.master.resizable(False, False) # 禁止调整窗口大小
self.master.update() # 强制应用尺寸限制
# 初始化样式
self.style = ttk.Style(theme='cosmo')
# 配置边框线为纯黑色的样式
self.style.configure('BlackBorder.TLabelframe', bordercolor='#D3D3D3', relief='solid', borderwidth=1)
# 串口参数
self.serial_port = None
self.receive_queue = queue.Queue()
self.auto_send_flag = False
self.send_count = 0
self.receive_count = 0
self.receive_thread = None
self.receive_thread_event = threading.Event() # 用于控制接收线程的事件
# 创建界面
self.create_widgets()
self.refresh_ports()
self.master.after(100, self.process_queue)
def create_widgets(self):
"""创建三栏式布局"""
main_frame = ttk.Frame(self.master)
main_frame.pack(fill=BOTH, expand=True, padx=10, pady=10)
# 左侧串口配置区
left_frame = ttk.Labelframe(main_frame, text="串口配置", padding=15, style='BlackBorder.TLabelframe')
left_frame.grid(row=0, column=0, sticky=NSEW, padx=5, pady=5)
# 右侧上下分区
right_frame = ttk.Frame(main_frame)
right_frame.grid(row=0, column=1, sticky=NSEW, padx=5, pady=5)
# 发送区(右上)
send_frame = ttk.Labelframe(right_frame, text="数据发送", padding=15, style='BlackBorder.TLabelframe')
send_frame.pack(fill=BOTH, expand=True, side=TOP)
# 接收区(右下)
recv_frame = ttk.Labelframe(right_frame, text="数据接收", padding=15, style='BlackBorder.TLabelframe')
recv_frame.pack(fill=BOTH, expand=True, side=TOP)
# 配置网格权重
main_frame.columnconfigure(1, weight=1)
main_frame.rowconfigure(0, weight=1)
right_frame.rowconfigure(1, weight=1)
# 创建各区域组件
self.create_serial_controls(left_frame)
self.create_send_controls(send_frame)
self.create_recv_controls(recv_frame)
# 状态栏
self.status_var = StringVar(value="就绪")
ttk.Label(self.master, textvariable=self.status_var,
bootstyle=(SECONDARY, INVERSE)).pack(fill=X, side=BOTTOM)
def create_serial_controls(self, parent):
"""串口参数控件"""
param_frame = ttk.Frame(parent)
param_frame.pack(fill=X)
# 串口号
ttk.Label(param_frame, text="COM端口:").grid(row=0, column=0, padx=5, pady=5, sticky=W)
self.port_cb = ttk.Combobox(param_frame, width=15)
self.port_cb.grid(row=0, column=1, padx=5, pady=5)
# 波特率
ttk.Label(param_frame, text="波特率:").grid(row=1, column=0, padx=5, pady=5, sticky=W)
self.baudrate_cb = ttk.Combobox(param_frame, values=[
'9600', '115200', '57600', '38400',
'19200', '14400', '4800', '2400', '1200'
], width=15)
self.baudrate_cb.set('9600')
self.baudrate_cb.grid(row=1, column=1, padx=5, pady=5)
# 校验位
ttk.Label(param_frame, text="校验位:").grid(row=2, column=0, padx=5, pady=5, sticky=W)
self.parity_cb = ttk.Combobox(param_frame, values=[
'None', 'Even', 'Odd', 'Mark', 'Space'
], width=15)
self.parity_cb.set('None')
self.parity_cb.grid(row=2, column=1, padx=5, pady=5)
# 数据位
ttk.Label(param_frame, text="数据位:").grid(row=3, column=0, padx=5, pady=5, sticky=W)
self.databits_cb = ttk.Combobox(param_frame, values=['8', '7', '6', '5'], width=15)
self.databits_cb.set('8')
self.databits_cb.grid(row=3, column=1, padx=5, pady=5)
# 停止位
ttk.Label(param_frame, text="停止位:").grid(row=4, column=0, padx=5, pady=5, sticky=W)
self.stopbits_cb = ttk.Combobox(param_frame, values=['1', '1.5', '2'], width=15)
self.stopbits_cb.set('1')
self.stopbits_cb.grid(row=4, column=1, padx=5, pady=5)
# 操作按钮
# 按钮容器
btn_frame = ttk.Frame(parent)
btn_frame.pack(pady=10, fill=X)
# 配置网格列权重实现自动伸缩
btn_frame.columnconfigure((0, 1, 2), weight=1, uniform='btns') # uniform 确保列宽一致
# 刷新按钮
ttk.Button(
btn_frame,
text="刷新端口",
command=self.refresh_ports,
bootstyle=OUTLINE
).grid(row=0, column=0, padx=5, sticky="ew")
# 连接按钮
self.conn_btn = ttk.Button(
btn_frame,
text="打开串口",
command=self.toggle_connection,
bootstyle=OUTLINE + SUCCESS
)
self.conn_btn.grid(row=0, column=1, padx=5, sticky="ew")
# 手动发送按钮(移动到此处)
ttk.Button(
btn_frame,
text="手动发送",
command=self.send_data,
bootstyle=OUTLINE + PRIMARY
).grid(row=0, column=2, padx=5, sticky="ew")
def create_send_controls(self, parent):
"""发送区控件"""
# 自动发送设置
auto_frame = ttk.Frame(parent)
auto_frame.pack(fill=X, pady=5)
self.auto_var = BooleanVar()
ttk.Checkbutton(auto_frame, text="自动发送", variable=self.auto_var,
command=self.toggle_auto_send).pack(side=LEFT)
ttk.Label(auto_frame, text="间隔(ms):").pack(side=LEFT, padx=5)
self.interval_entry = ttk.Entry(auto_frame, width=8)
self.interval_entry.insert(0, "1000")
self.interval_entry.pack(side=LEFT)
# 发送内容
self.send_text = ScrolledText(parent, height=4, autohide=True)
self.send_text.pack(fill=BOTH, expand=True)
# 发送按钮
def create_recv_controls(self, parent):
"""接收区控件"""
# 接收显示
self.recv_text = ScrolledText(parent, height=5, autohide=True)
self.recv_text.pack(fill=BOTH, expand=True)
# 统计栏
stat_frame = ttk.Frame(parent)
stat_frame.pack(fill=X, pady=5)
ttk.Label(stat_frame, text="发送:").pack(side=LEFT, padx=5)
self.send_label = ttk.Label(stat_frame, text="0")
self.send_label.pack(side=LEFT)
ttk.Label(stat_frame, text="接收:").pack(side=LEFT, padx=10)
self.recv_label = ttk.Label(stat_frame, text="0")
self.recv_label.pack(side=LEFT)
ttk.Button(stat_frame, text="清空", command=self.clear_received,
bootstyle=OUTLINE + WARNING).pack(side=RIGHT)
def refresh_ports(self):
"""刷新端口列表"""
try:
ports = [p.device for p in serial.tools.list_ports.comports()]
self.port_cb['values'] = ports
self.status_var.set(f"自动检测到主板有{len(ports)} 个串口可用,请注意选择正确的。")
except Exception as e:
print(f"Error refreshing ports: {e}")
self.status_var.set(f"刷新端口时出错: {e}")
def toggle_connection(self):
"""切换连接状态"""
if self.serial_port and self.serial_port.is_open:
self.close_serial()
else:
self.open_serial()
def open_serial(self):
"""打开串口"""
try:
port = self.port_cb.get()
if not port:
raise ValueError("请选择串口")
parity_map = {
'None': serial.PARITY_NONE,
'Even': serial.PARITY_EVEN,
'Odd': serial.PARITY_ODD,
'Mark': serial.PARITY_MARK,
'Space': serial.PARITY_SPACE
}
self.serial_port = serial.Serial(
port=port,
baudrate=int(self.baudrate_cb.get()),
parity=parity_map[self.parity_cb.get()],
bytesize=int(self.databits_cb.get()),
stopbits=float(self.stopbits_cb.get()),
timeout=0.1
)
self.conn_btn.configure(text="关闭串口", bootstyle=OUTLINE + SUCCESS)
self.status_var.set(f"已连接 {port}")
self.receive_thread_event.clear() # 清除事件标志
self.receive_thread = threading.Thread(target=self.receive_worker, daemon=True)
self.receive_thread.start()
except Exception as e:
Messagebox.show_error(f"主板上没有这个串口或你选的被测端口跟主板端口不对应,请在设备管理器中确认正确的端口: {str(e)}", "错误")
self.status_var.set("连接失败")
def close_serial(self):
"""关闭串口"""
self.receive_thread_event.set() # 设置事件标志,通知接收线程停止
if self.receive_thread and self.receive_thread.is_alive():
self.receive_thread.join() # 等待接收线程结束
if self.serial_port:
try:
self.serial_port.close()
except Exception as e:
print(f"关闭串口时出错: {e}")
self.conn_btn.configure(text="打开串口", bootstyle=DANGER)
self.status_var.set("已断开连接")
def receive_worker(self):
"""接收线程工作函数"""
while not self.receive_thread_event.is_set() and self.serial_port and self.serial_port.is_open:
try:
if self.serial_port.in_waiting > 0:
data = self.serial_port.read(self.serial_port.in_waiting)
self.receive_queue.put(data)
except Exception as e:
print(f"接收错误: {e}")
break
def process_queue(self):
"""处理接收队列"""
while not self.receive_queue.empty():
data = self.receive_queue.get()
self.display_received(data)
self.receive_count += len(data)
self.recv_label.configure(text=str(self.receive_count))
self.master.after(100, self.process_queue)
def display_received(self, data):
"""显示接收数据"""
try:
text = data.decode('utf-8')
except UnicodeDecodeError:
text = data.hex(' ')
self.recv_text.insert(END, text + '\n')
self.recv_text.see(END)
def toggle_auto_send(self):
"""切换自动发送"""
self.auto_send_flag = self.auto_var.get()
if self.auto_send_flag:
self.auto_send_task()
def auto_send_task(self):
"""自动发送任务"""
if self.auto_send_flag and self.serial_port and self.serial_port.is_open:
try:
interval = int(self.interval_entry.get())
self.send_data()
self.master.after(interval, self.auto_send_task)
except ValueError:
self.auto_var.set(False)
Messagebox.show_error("无效的间隔时间", "错误")
def send_data(self):
"""发送数据"""
if not self.serial_port or not self.serial_port.is_open:
Messagebox.show_warning("请先打开串口", "警告")
return
data = self.send_text.get(1.0, END).strip()
if not data:
return
try:
self.serial_port.write(data.encode('utf-8'))
self.send_count += len(data)
self.send_label.configure(text=str(self.send_count))
except Exception as e:
Messagebox.show_error(f"发送失败: {str(e)}", "错误")
def clear_received(self):
"""清空接收区"""
self.recv_text.delete(1.0, END)
self.receive_count = 0
self.recv_label.configure(text="0")
self.send_text.delete(1.0, END)
self.send_count = 0
self.send_label.configure(text="0")
def on_closing(self):
"""安全关闭程序"""
# 停止自动发送循环
self.auto_send_flag = False
# 关闭串口连接
self.close_serial()
# 确保完全退出
self.master.quit() # 终止mainloop
self.master.destroy() # 销毁所有Tkinter对象
self.master.after(500, self.force_exit) # 500ms后强制退出
def force_exit(self):
"""最终退出保障"""
import os
os._exit(0) # 强制终止进程
if __name__ == "__main__":
root = ttk.Window()
app = SerialTool(root)
root.protocol("WM_DELETE_WINDOW", app.on_closing)
root.mainloop()