好友
阅读权限10
听众
最后登录1970-1-1
|
本帖最后由 sty19890218 于 2026-1-20 15:35 编辑
Modbus-RTU 通信工具(V3.1)详细使用说明书
文档说明
本文档为 Modbus-RTU 通信工具(V3.1)的全流程操作指南,覆盖从软件启动到高级功能使用的每一步操作,适配零基础用户快速上手,同时满足工业调试场景的专业需求。
目录
1.工具基础信息
2.环境部署与启动
3.串口配置(逐步骤)
4.串口参数自动扫描(逐步骤)
5.核心功能操作(逐步骤)
6.数据可视化操作
7.扩展功能(别名 / 报警)
8.快捷键与故障排查
9.配置文件说明
- 工具基础信息
1.1 功能清单
功能模块 核心能力
串口管理 自动检测串口、参数配置、多从站支持
参数自动扫描 遍历 48 组参数组合,自动匹配目标设备的波特率 / 校验位 / 停止位
数据抓包 精准解析 03/07/11 功能码帧,显示寄存器地址 + 数值
读写操作 支持 01/02/03/04/05/06/07/08/11/15/16 功能码的读 / 写 / 诊断操作
批量读取 自定义多功能码批量读取,支持自动遍历所有读功能码
数据可视化 实时绘制单个寄存器数值曲线
数据导出 通信日志导出 TXT、结构化数据导出 Excel
扩展功能 寄存器别名配置、数值超限报警(蜂鸣 + 日志高亮)
1.2 功能码规范(必看)
功能码 名称 操作类型 原始地址→规范地址示例 最大操作数量
01 读线圈寄存器 读 0→00001 2000
02 读离散输入寄存器 读 0→00001 2000
03 读保持寄存器 读 0→40001 125
04 读输入寄存器 读 0→30001 125
05 写单个线圈 写(单) 0→00001 1
06 写单个保持寄存器 写(单) 0→40001 1
07 读取异常状态 读 无规范地址 1
08 诊断 诊断 无规范地址 1
11 获取通信事件计数器 读 无规范地址 1
15 写多个线圈 写(多) 0→00001 1968
16 写多个保持寄存器 写(多) 0→40001 123
- 环境部署与启动
步骤 1:安装 Python 环境
1.下载 Python 3.7 及以上版本(推荐 3.9):https://www.python.org/downloads/
2.安装时勾选「Add Python to PATH」,点击「Install Now」完成安装。
步骤 2:安装依赖库
1.按下「Win+R」,输入「cmd」打开命令提示符;
2.依次执行以下命令(复制后粘贴回车):
bash
运行
pip install pyserial openpyxl matplotlib
注:tkinter 为 Python 自带库,若提示缺失,执行pip install tkinter补充安装。
步骤 3:启动工具
1.将「485 抓包 V3.1.py」文件保存至本地(如桌面);
2.双击该文件,或在命令提示符中执行:
bash
运行
python 桌面\485抓包V3.1.py
1.启动成功后,界面显示「Modbus-RTU 工具(V3.1)」标题,状态栏提示「就绪:未打开串口」。
- 串口配置(逐步骤)
步骤 1:检测串口
1.找到界面「串口配置」区域(左上角);
2.点击「检测串口」按钮;
3.若连接串口设备(如 485 转换器、PLC),串口下拉框会显示「COMx - 设备描述」(例:COM3 - USB-SERIAL CH340);
4.若提示「未检测到串口」,检查:
1.串口设备是否正确连接电脑;
2.设备驱动是否安装(如 CH340、PL2303 驱动);
3.串口是否被其他软件占用(如串口助手、PLC 编程软件)。
步骤 2:选择串口参数
参数项 操作步骤
串口 点击串口下拉框,选择检测到的串口(如 COM3)
波特率 点击波特率下拉框,选择目标值(常用 9600、19200、115200)
校验位 点击校验位下拉框,选择「无校验」「奇校验」「偶校验」(默认无校验)
停止位 点击停止位下拉框,选择「1」或「2」(默认 1)
数据位 固定为 8 位,无需配置
多从站地址 如需批量操作多从站,在输入框填写逗号分隔的地址(如 1,2,3),默认仅 1
步骤 3:打开串口
1.确认参数后,点击「打开串口」按钮;
2.弹出参数确认弹窗(例:9600 / 无校验 / 1/8),点击「是」;
3.成功后状态栏显示「就绪:串口已打开 | 参数:9600 / 无校验 / 1/8」,且「开始抓包」「执行操作」等按钮从灰色变为可点击;
4.若失败,弹窗提示错误原因(如「串口被占用」「参数错误」),需排查后重新操作。
- 串口参数自动扫描(逐步骤)
适用场景:未知目标设备的串口参数,需自动匹配
步骤 1:准备工作
1.确保串口设备已连接,且完成「检测串口」并选择目标串口;
2.关闭其他占用该串口的软件(避免扫描失败)。
步骤 2:启动扫描
1.点击「扫描参数」按钮;
2.状态栏提示「正在扫描:自动匹配合法 03 帧参数」,日志区域显示「【扫描开始】优先测试停止位 = 1 的参数组合」;
3.工具自动遍历 48 组参数组合,每扫描一组,日志显示「【扫描 X/48】波特率 / 校验位 / 停止位 / 数据位」。
步骤 3:扫描结果处理
1.若找到有效参数:
1.日志显示「【找到有效参数】9600 / 无校验 / 1/8 | 捕获合法 03 帧!」;
2.自动填充波特率 / 校验位 / 停止位参数,并弹窗提示「找到有效参数」;
3.点击「确定」后,可直接点击「打开串口」使用匹配的参数。
2.若未找到有效参数:
1.日志显示「【扫描完成】未找到合法参数!」;
2.弹窗提示「扫描完成:未找到合法参数」;
3.排查:确认设备是否上电、485 接线是否正确、设备是否支持 Modbus-RTU 03 功能码。
步骤 4:停止扫描(可选)
扫描过程中如需终止,点击「停止操作」按钮,工具立即停止扫描并恢复初始状态。
- 核心功能操作(逐步骤)
5.1 数据抓包(实时解析 03/07/11 功能码)
前提:已成功打开串口
步骤 1:启动抓包
1.点击「开始抓包」按钮;
2.状态栏提示「正在抓包 | 参数:9600 / 无校验 / 1/8」,日志区域显示「【抓包开始】精准解析 03 功能码响应帧」。
步骤 2:解析数据查看
1.当串口有 03 功能码数据传输时,日志区域显示:
1.请求帧:[2024-05-20 10:00:00.000] 合法帧 | 请求帧 | 从站1 | 功能码03 | 起始地址0(规范地址40001) | 寄存器数量10 | 原始数据:01 03 00 00 00 0A C4 0B
2.响应帧:[2024-05-20 10:00:00.100] 响应帧:01 03 14 00 01 00 02 00 03 ...
3.寄存器数值:40001: 1 | 40002: 2 | 40003: 3 ...(深蓝色粗体)
2.若未捕获响应帧,日志显示「【超时】未捕获到从站 1 的响应帧」(橙色粗体)。
步骤 3:停止抓包
点击「停止操作」按钮,状态栏恢复「就绪:串口已打开」,抓包停止。
5.2 单功能码读写操作
前提:已打开串口,以 03 功能码(读保持寄存器)为例
步骤 1:配置操作参数
1.找到「读写操作参数」区域:
1.功能码:下拉框选择「03」;
2.从站地址:输入目标从站地址(如 1,范围 1-247);
3.起始地址:输入原始地址(如 0,对应规范地址 40001);
4.数量:输入读取数量(如 10,≤125);
5.读取间隔:输入 0.5(单位秒,≥0.1);
2.若为写操作(如 06 功能码):
1.功能码选择「06」;
2.起始地址输入 0(对应 40001);
3.写入值:输入要写入的数值(如 100);
4.读取间隔无需配置(写操作无间隔)。
步骤 2:执行操作
1.点击「执行操作」按钮;
2.若参数非法(如地址>247、数量>125),弹窗提示「参数超出合法范围」;
3.合法参数则启动操作,日志显示:
1.操作帧:[2024-05-20 10:10:00.000] 操作帧 | 从站1 | 功能码03 | 地址范围:40001 ~ 40010 | 寄存器数量10 | 原始数据:01 03 00 00 00 0A C4 0B
2.寄存器数值:实时显示每个寄存器的地址和数值。
步骤 3:停止操作
点击「停止操作」按钮,终止读写循环。
5.3 批量读取(多功能码自动遍历)
前提:已打开串口
步骤 1:配置批量参数
1.找到「批量读取参数」区域:
1.读取参数:默认值「01:0,10;02:0,10;03:0,10;04:0,10;07:0,1;11:0,1」,格式为「功能码:起始地址,数量;...」;
2.读取间隔:输入 0.2(单位秒,≥0.1);
3.勾选「自动读取所有读功能码」(默认勾选,自动遍历 01/02/03/04/07/11)。
步骤 2:启动批量读取
1.点击「批量读取」按钮;
2.状态栏提示「正在执行批量读取操作」,日志按顺序显示每个功能码的读取结果:
1.[2024-05-20 10:20:00.000] 操作帧 | 从站1 | 功能码01 | 地址范围:00001 ~ 00010 | 寄存器数量10 | 原始数据:01 01 00 00 00 0A FC 08
2.[2024-05-20 10:20:00.200] 操作帧 | 从站1 | 功能码02 | 地址范围:00001 ~ 00010 | 寄存器数量10 | 原始数据:01 02 00 00 00 0A 7C 08
步骤 3:停止批量读取
点击「停止操作」按钮,终止批量读取循环。
5.4 数据导出
方式 1:保存通信日志(TXT)
1.确保串口已打开(未打开则按钮灰色);
2.点击「保存日志 (TXT)」按钮;
3.弹出文件保存对话框,选择保存路径(如桌面),输入文件名(如「通信日志 20240520.txt」);
4.点击「保存」,日志文件包含所有通信记录(时间戳、帧数据、寄存器数值)。
方式 2:导出 Excel(结构化数据)
1.执行过读写 / 批量读取操作后,点击「导出 Excel」按钮;
2.选择保存路径,输入文件名(如「寄存器数据 20240520.xlsx」);
3.点击「保存」,Excel 文件包含列:时间戳、从站地址、功能码、寄存器地址、数值、别名(若配置)。
- 数据可视化操作
前提:已打开串口,且目标寄存器有数据读取
步骤 1:配置监控寄存器
1.切换到「数据可视化」标签页;
2.在「监控寄存器」输入框填写规范地址(如 40001,对应 03 功能码原始地址 0);
3.确认输入的地址已在「主功能」标签页中执行过读取操作(否则无数据)。
步骤 2:启动实时监控
1.点击「开始监控」按钮;
2.绘图区域自动绘制数值曲线:
1.X 轴:时间(秒,从监控开始计时);
2.Y 轴:寄存器数值;
3.曲线:实时刷新,蓝色实线显示数值变化。
步骤 3:清空曲线
点击「清空曲线」按钮,绘图区域重置,停止当前监控(需重新点击「开始监控」恢复)。
- 扩展功能(别名 / 报警)
7.1 寄存器别名配置
作用:将规范地址(如 40001)命名为易理解的名称(如「温度传感器」),日志中同步显示
步骤 1:打开别名配置
1.点击「寄存器别名」按钮;
2.弹出配置窗口(首次打开为空)。
步骤 2:添加 / 编辑别名
1.输入规范地址(如 40001);
2.输入别名(如「温度传感器 - 车间 1」);
3.点击「添加」,别名立即生效;
4.如需修改,选中已添加的别名,修改后点击「编辑」;
5.如需删除,选中后点击「删除」。
步骤 3:保存配置
配置完成后点击「保存」,别名数据自动保存至「reg_aliases.json」文件(工具同目录),重启工具后仍生效。
7.2 报警配置
作用:设置寄存器数值阈值,超出时触发蜂鸣 + 日志高亮(红色粗体)
步骤 1:打开报警配置
1.点击「报警配置」按钮;
2.弹出配置窗口。
步骤 2:添加报警规则
1.输入规范地址(如 40001);
2.选择报警类型:「大于」「小于」「等于」;
3.输入阈值(如 80);
4.点击「添加」,规则立即生效。
步骤 3:触发报警
当寄存器数值满足报警规则(如 40001 数值 = 85>80):
1.电脑发出蜂鸣音;
2.日志显示「【报警】40001(温度传感器):85 > 80」(红色粗体)。
步骤 4:管理规则
选中已添加的规则,可点击「编辑」修改或「删除」移除,点击「保存」后规则保存至「alarm_config.json」文件。
- 快捷键与故障排查
8.1 快捷键清单
快捷键 功能 适用场景
F5 打开串口 已检测并选择串口
F6 开始抓包 串口已打开
F7 停止操作 抓包 / 读写 / 批量读取中
Ctrl+S 保存日志(TXT) 串口已打开
Ctrl+E 导出 Excel 执行过读写操作
8.2 常见故障排查
故障现象 排查步骤
检测不到串口 1. 检查设备接线;2. 重装串口驱动;3. 以管理员身份运行工具
打开串口提示「被占用」 1. 关闭其他串口软件(如串口助手);2. 重启电脑释放串口占用
抓包无数据 1. 确认串口参数匹配;2. 检查 485 接线(A/B 线是否接反);3. 设备是否上电
读写操作提示参数非法 1. 从站地址是否在 1-247;2. 数量是否≤功能码最大限制;3. 读取间隔≥0.1 秒
导出 Excel 为空 1. 确认执行过读写 / 批量读取操作;2. 检查是否有结构化数据生成
数据可视化无曲线 1. 确认监控寄存器地址正确;2. 确认该地址有数据读取;3. 重启工具重试
- 配置文件说明
工具运行目录下自动生成 3 个配置文件,删除后恢复默认:
文件名 作用
modbus_config.json 保存历史串口参数、从站地址、最后使用的功能码
reg_aliases.json 保存寄存器别名配置
alarm_config.json 保存报警规则配置
[Python] 纯文本查看 复制代码 import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox, filedialog
import serial
import serial.tools.list_ports
import threading
import time
from datetime import datetime
import queue
import sys
import json
import os
import winsound # 报警蜂鸣
from openpyxl import Workbook # Excel导出
import matplotlib.pyplot as plt # 可视化
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
# ========== 修复中文字体显示 ==========
import matplotlib
# 设置matplotlib支持中文(这部分保留,无问题)
matplotlib.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans'] # 优先使用系统中文字体
matplotlib.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
# 定义字体配置函数(保留,但暂时不执行)
def set_tkinter_font(root): # 新增参数:传入主窗口对象
default_font = tk.font.nametofont("TkDefaultFont", root=root) # 指定root
default_font.configure(family="SimHei", size=9)
text_font = tk.font.nametofont("TkTextFont", root=root)
text_font.configure(family="SimHei", size=10)
fixed_font = tk.font.nametofont("TkFixedFont", root=root)
fixed_font.configure(family="Consolas", size=10)
# ========== 主类定义 ==========
class ModbusRTUTool:
def __init__(self, root):
# 基础配置
self.root = root
self.root.title("Modbus-RTU工具 (V3.1)")
self.root.geometry("1600x1000")
self.root.resizable(True, True)
self.root.configure(bg="#f0f8ff")
# ========== 关键修复:在这里执行字体配置(主窗口已创建) ==========
set_tkinter_font(self.root)
# 核心状态变量
self.status_var = tk.StringVar(value="就绪:未打开串口 | 支持参数自动扫描 | 精准解析多功能码 | 支持批量读取所有功能码")
self.ser = None
self.is_running = False # 统一运行状态标识
self.data_queue = queue.Queue(maxsize=1000)
self.captured_data = []
# 扩展功能变量
self.structured_data = [] # 结构化数据(用于Excel导出)
self.reg_aliases = {} # 寄存器别名
self.alarm_config = {} # 报警配置
self.slave_list = tk.StringVar(value="1") # 多从站列表
self.plot_data = {"x": [], "y": [], "addr": ""} # 绘图数据
self.start_plot_time = None # 绘图起始时间
# 功能码配置
self.func_codes = {
"01": {"name": "读线圈寄存器", "type": "read", "max": 2000, "base": 1, "prefix": "000"},
"02": {"name": "读离散输入寄存器", "type": "read", "max": 2000, "base": 1, "prefix": "000"},
"03": {"name": "读保持寄存器", "type": "read", "max": 125, "base": 40001, "prefix": ""},
"04": {"name": "读输入寄存器", "type": "read", "max": 125, "base": 30001, "prefix": ""},
"05": {"name": "写单个线圈", "type": "write_single", "max": 1, "base": 1, "prefix": "000"},
"06": {"name": "写单个保持寄存器", "type": "write_single", "max": 1, "base": 40001, "prefix": ""},
"07": {"name": "读取异常状态", "type": "read", "max": 1, "base": 0, "prefix": ""},
"08": {"name": "诊断", "type": "diagnostic", "max": 1, "base": 0, "prefix": ""},
"11": {"name": "获取通信事件计数器", "type": "read", "max": 1, "base": 0, "prefix": ""},
"15": {"name": "写多个线圈", "type": "write_multi", "max": 1968, "base": 1, "prefix": "000"},
"16": {"name": "写多个保持寄存器", "type": "write_multi", "max": 123, "base": 40001, "prefix": ""}
}
self.read_funcs = ["01", "02", "03", "04", "07", "11"]
# 参数变量
self.slave_addr = tk.StringVar(value="1")
self.func_code = tk.StringVar(value="01")
self.start_reg = tk.StringVar(value="0")
self.count = tk.StringVar(value="1")
self.write_val = tk.StringVar(value="1")
self.write_vals = tk.StringVar(value="1,0,1")
self.read_interval = tk.StringVar(value="0.5")
self.batch_params = tk.StringVar(value="01:0,10;02:0,10;03:0,10;04:0,10;07:0,1;11:0,1")
self.batch_interval = tk.StringVar(value="0.2")
self.batch_all = tk.BooleanVar(value=True)
# 扫描参数组合
self.scan_combs = [
("9600", "1", "无校验", "8"), ("4800", "1", "无校验", "8"), ("19200", "1", "无校验", "8"),
("9600", "1", "奇校验", "8"), ("9600", "1", "偶校验", "8"), ("38400", "1", "无校验", "8"),
("57600", "1", "无校验", "8"), ("115200", "1", "无校验", "8"), ("230400", "1", "无校验", "8"),
("2400", "1", "无校验", "8"), ("4800", "1", "奇校验", "8"), ("4800", "1", "偶校验", "8"),
("19200", "1", "奇校验", "8"), ("19200", "1", "偶校验", "8"), ("38400", "1", "奇校验", "8"),
("38400", "1", "偶校验", "8"), ("57600", "1", "奇校验", "8"), ("57600", "1", "偶校验", "8"),
("115200", "1", "奇校验", "8"), ("115200", "1", "偶校验", "8"), ("230400", "1", "奇校验", "8"),
("230400", "1", "偶校验", "8"), ("2400", "1", "奇校验", "8"), ("2400", "1", "偶校验", "8"),
("9600", "2", "无校验", "8"), ("4800", "2", "无校验", "8"), ("19200", "2", "无校验", "8"),
("9600", "2", "奇校验", "8"), ("9600", "2", "偶校验", "8"), ("38400", "2", "无校验", "8"),
("57600", "2", "无校验", "8"), ("115200", "2", "无校验", "8"), ("230400", "2", "无校验", "8"),
("2400", "2", "无校验", "8"), ("4800", "2", "奇校验", "8"), ("4800", "2", "偶校验", "8"),
("19200", "2", "奇校验", "8"), ("19200", "2", "偶校验", "8"), ("38400", "2", "奇校验", "8"),
("38400", "2", "偶校验", "8"), ("57600", "2", "奇校验", "8"), ("57600", "2", "偶校验", "8"),
("115200", "2", "奇校验", "8"), ("115200", "2", "偶校验", "8"), ("230400", "2", "奇校验", "8"),
("230400", "2", "偶校验", "8"), ("2400", "2", "奇校验", "8"), ("2400", "2", "偶校验", "8"),
]
# 加载配置
self._load_all_config()
# 构建UI
self._setup_style()
self._build_ui()
self._bind_events()
# 启动数据更新
self.root.after(50, self._update_display)
def _setup_style(self):
"""样式配置"""
style = ttk.Style(self.root)
style.theme_use("clam")
# 核心样式
style.configure("Title.TLabel", font=("微软雅黑", 12, "bold"), foreground="#1E90FF", background="#f0f8ff")
style.configure("Frame.TLabelframe", font=("微软雅黑", 10, "bold"), foreground="#2F4F4F")
# 按钮样式
btn_styles = {
"Detect.TButton": {"bg": "#4682B4"}, "Open.TButton": {"bg": "#1E90FF"},
"Scan.TButton": {"bg": "#32CD32"}, "Start.TButton": {"bg": "#20B2AA"},
"Stop.TButton": {"bg": "#DC143C"}, "Clear.TButton": {"bg": "#FF8C00"},
"Save.TButton": {"bg": "#9370DB"}, "Operate.TButton": {"bg": "#9932CC"},
"Batch.TButton": {"bg": "#008B8B"}, "Excel.TButton": {"bg": "#228B22"},
"Alias.TButton": {"bg": "#FF6347"}, "Alarm.TButton": {"bg": "#FF4500"}
}
for name, cfg in btn_styles.items():
style.configure(name, font=("微软雅黑", 9), foreground="white", background=cfg["bg"], padding=3)
def _build_ui(self):
"""构建UI"""
# 主标签页
main_notebook = ttk.Notebook(self.root)
main_notebook.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# 1. 主功能标签页
main_frame = ttk.Frame(main_notebook)
main_notebook.add(main_frame, text="主功能")
# 标题
ttk.Label(main_frame, text="Modbus-RTU 通信工具 V3.1", style="Title.TLabel").pack(fill=tk.X, pady=(0, 10))
# 串口配置
port_frame = ttk.LabelFrame(main_frame, text="串口配置", style="Frame.TLabelframe")
port_frame.pack(fill=tk.X, pady=(0, 10))
# 串口选择
ttk.Label(port_frame, text="串口:").grid(row=0, column=0, padx=5, pady=5, sticky="e")
self.port_combo = ttk.Combobox(port_frame, state="readonly", width=15)
self.port_combo.grid(row=0, column=1, padx=5, pady=5)
ttk.Button(port_frame, text="检测串口", command=self._detect_ports, style="Detect.TButton").grid(row=0, column=1, padx=(180,5), pady=5, sticky="w")
# 串口参数
params = [
("波特率:", ["110","300","600","1200","2400","4800","9600","19200","38400","57600","115200"], 0, 2, 3, "9600"),
("校验位:", ["无校验","奇校验","偶校验"], 0, 4, 5, "无校验"),
("停止位:", ["1","2"], 1, 2, 3, "1"),
("数据位:", ["8"], 1, 4, 5, "8")
]
self.param_combos = {}
for lbl, vals, r, c1, c2, default in params:
ttk.Label(port_frame, text=lbl).grid(row=r, column=c1, padx=5, pady=5, sticky="e")
combo = ttk.Combobox(port_frame, values=vals, state="readonly", width=10)
combo.set(default)
combo.grid(row=r, column=c2, padx=5, pady=5)
self.param_combos[lbl[:-1]] = combo
# 多从站配置
ttk.Label(port_frame, text="多从站地址:").grid(row=2, column=0, padx=5, pady=5, sticky="e")
ttk.Entry(port_frame, textvariable=self.slave_list, width=20).grid(row=2, column=1, padx=5, pady=5)
ttk.Label(port_frame, text="(逗号分隔,如1,2,3)", font=("微软雅黑", 8), foreground="#696969").grid(row=2, column=2, padx=5, pady=5, sticky="w")
# 控制按钮
ctrl_frame = ttk.Frame(main_frame)
ctrl_frame.pack(fill=tk.X, pady=(0, 10))
self.buttons = {
"open": ttk.Button(ctrl_frame, text="打开串口", command=self._open_serial, style="Open.TButton"),
"scan": ttk.Button(ctrl_frame, text="扫描参数", command=self._start_scan, style="Scan.TButton"),
"sniff": ttk.Button(ctrl_frame, text="开始抓包", command=self._start_sniff, style="Start.TButton", state=tk.DISABLED),
"operate": ttk.Button(ctrl_frame, text="执行操作", command=self._start_operate, style="Operate.TButton", state=tk.DISABLED),
"batch": ttk.Button(ctrl_frame, text="批量读取", command=self._start_batch, style="Batch.TButton", state=tk.DISABLED),
"stop": ttk.Button(ctrl_frame, text="停止操作", command=self._stop_all, style="Stop.TButton", state=tk.DISABLED),
"clear": ttk.Button(ctrl_frame, text="清空日志", command=self._clear_data, style="Clear.TButton"),
"save": ttk.Button(ctrl_frame, text="保存日志(TXT)", command=self._save_data, style="Save.TButton", state=tk.DISABLED),
"excel": ttk.Button(ctrl_frame, text="导出Excel", command=self._save_excel, style="Excel.TButton", state=tk.DISABLED),
"alias": ttk.Button(ctrl_frame, text="寄存器别名", command=self._edit_aliases, style="Alias.TButton"),
"alarm": ttk.Button(ctrl_frame, text="报警配置", command=self._edit_alarm, style="Alarm.TButton"),
}
for btn in self.buttons.values():
btn.pack(side=tk.LEFT, padx=5)
# 操作参数
self._build_operate_params(main_frame)
# 批量读取参数
self._build_batch_params(main_frame)
# 日志显示
log_frame = ttk.LabelFrame(main_frame, text="通信日志", style="Frame.TLabelframe")
log_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 10))
self.log_text = scrolledtext.ScrolledText(log_frame, font=("Consolas", 10), wrap=tk.NONE)
self.log_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self.log_text.config(state=tk.DISABLED)
# 日志样式
styles = {
"modbus": ("#006400", "bold"), "info": ("#4169E1", ""), "error": ("#DC143C", "bold"),
"warn": ("#FF8C00", "bold"), "reg_value": ("#0000CD", "bold"), "reg_pair": ("#8B008B", "bold"),
"alarm": ("#FF0000", "bold")
}
for name, (color, weight) in styles.items():
self.log_text.tag_configure(name, foreground=color, font=("Consolas", 10, weight))
# 2. 数据可视化标签页
plot_frame = ttk.Frame(main_notebook)
main_notebook.add(plot_frame, text="数据可视化")
# 可视化配置
plot_ctrl_frame = ttk.Frame(plot_frame)
plot_ctrl_frame.pack(fill=tk.X, pady=10, padx=10)
ttk.Label(plot_ctrl_frame, text="监控寄存器:").pack(side=tk.LEFT, padx=(0,5))
self.plot_reg = ttk.Entry(plot_ctrl_frame, width=15)
self.plot_reg.pack(side=tk.LEFT, padx=(0,15))
self.plot_reg.insert(0, "40001")
ttk.Button(plot_ctrl_frame, text="开始监控", command=self._start_plot, style="Start.TButton").pack(side=tk.LEFT, padx=(0,15))
ttk.Button(plot_ctrl_frame, text="清空曲线", command=self._clear_plot, style="Clear.TButton").pack(side=tk.LEFT)
# 绘图区域
self.fig, self.ax = plt.subplots(figsize=(12, 6))
self.ax.set_title("寄存器数值实时曲线", fontsize=12)
self.ax.set_xlabel("时间(秒)", fontsize=10)
self.ax.set_ylabel("数值", fontsize=10)
self.ax.grid(True, alpha=0.3)
self.line, = self.ax.plot([], [], 'b-', linewidth=2, label="实时数值")
self.ax.legend()
self.canvas = FigureCanvasTkAgg(self.fig, master=plot_frame)
self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# 状态栏
ttk.Label(self.root, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W).pack(fill=tk.X, side=tk.BOTTOM)
def _build_operate_params(self, parent):
"""构建操作参数布局"""
op_frame = ttk.LabelFrame(parent, text="读写操作参数", style="Frame.TLabelframe")
op_frame.pack(fill=tk.X, pady=(0, 10))
row1 = ttk.Frame(op_frame)
row1.pack(fill=tk.X, padx=10, pady=5)
params = [
("功能码:", self.func_code, list(self.func_codes.keys()), 8),
("从站地址:", self.slave_addr, None, 8),
("起始地址:", self.start_reg, None, 10),
("数量:", self.count, None, 8)
]
self.func_desc = ttk.Label(row1, font=("微软雅黑", 8), foreground="#696969")
self.addr_desc = ttk.Label(row1, font=("微软雅黑", 8), foreground="#696969")
self.count_desc = ttk.Label(row1, font=("微软雅黑", 8), foreground="#696969")
desc_labels = [self.func_desc, self.addr_desc, self.count_desc]
desc_texts = ["(读线圈寄存器)", "(原始地址0→00001)", "(最多2000)"]
for i, (lbl, var, vals, width) in enumerate(params):
ttk.Label(row1, text=lbl).pack(side=tk.LEFT, padx=(0,5))
if vals:
ttk.Combobox(row1, textvariable=var, values=vals, state="readonly", width=width).pack(side=tk.LEFT, padx=(0,15))
else:
ttk.Entry(row1, textvariable=var, width=width).pack(side=tk.LEFT, padx=(0,15))
if i < len(desc_labels):
desc_labels[i].config(text=desc_texts[i])
desc_labels[i].pack(side=tk.LEFT, padx=(0,20))
self.dyn_frame = ttk.Frame(op_frame)
self.dyn_frame.pack(fill=tk.X, padx=10, pady=5)
self.interval_frame = ttk.Frame(self.dyn_frame)
ttk.Label(self.interval_frame, text="读取间隔:").pack(side=tk.LEFT, padx=(0,5))
ttk.Entry(self.interval_frame, textvariable=self.read_interval, width=8).pack(side=tk.LEFT, padx=(0,15))
ttk.Label(self.interval_frame, text="秒(≥0.1)", font=("微软雅黑", 8), foreground="#696969").pack(side=tk.LEFT)
self.single_frame = ttk.Frame(self.dyn_frame)
ttk.Label(self.single_frame, text="写入值:").pack(side=tk.LEFT, padx=(0,5))
self.single_hint = ttk.Label(self.single_frame, font=("微软雅黑", 8), foreground="#696969")
ttk.Entry(self.single_frame, textvariable=self.write_val, width=15).pack(side=tk.LEFT, padx=(0,15))
self.single_hint.pack(side=tk.LEFT)
self.multi_frame = ttk.Frame(self.dyn_frame)
ttk.Label(self.multi_frame, text="写入值列表:").pack(side=tk.LEFT, padx=(0,5))
ttk.Entry(self.multi_frame, textvariable=self.write_vals, width=30).pack(side=tk.LEFT, padx=(0,15))
ttk.Label(self.multi_frame, text="(逗号分隔)", font=("微软雅黑", 8), foreground="#696969").pack(side=tk.LEFT)
def _build_batch_params(self, parent):
"""构建批量参数布局"""
batch_frame = ttk.LabelFrame(parent, text="批量读取参数", style="Frame.TLabelframe")
batch_frame.pack(fill=tk.X, pady=(0, 10))
row1 = ttk.Frame(batch_frame)
row1.pack(fill=tk.X, padx=10, pady=5)
ttk.Label(row1, text="读取参数:").pack(side=tk.LEFT, padx=(0,5))
ttk.Entry(row1, textvariable=self.batch_params, width=60).pack(side=tk.LEFT, padx=(0,15))
ttk.Label(row1, text="格式:功能码:起始地址,数量;...", font=("微软雅黑", 8), foreground="#696969").pack(side=tk.LEFT)
row2 = ttk.Frame(batch_frame)
row2.pack(fill=tk.X, padx=10, pady=5)
ttk.Label(row2, text="读取间隔:").pack(side=tk.LEFT, padx=(0,5))
ttk.Entry(row2, textvariable=self.batch_interval, width=8).pack(side=tk.LEFT, padx=(0,15))
ttk.Label(row2, text="秒(≥0.1)", font=("微软雅黑", 8), foreground="#696969").pack(side=tk.LEFT, padx=(0,20))
ttk.Checkbutton(row2, text="自动读取所有读功能码", variable=self.batch_all).pack(side=tk.LEFT, padx=(20,0))
def _bind_events(self):
"""绑定事件"""
self.func_code.trace_add('write', self._on_func_change)
self.root.protocol("WM_DELETE_WINDOW", self._on_close)
# 快捷键
self.root.bind("<F5>", lambda e: self._open_serial())
self.root.bind("<F6>", lambda e: self._start_sniff())
self.root.bind("<F7>", lambda e: self._stop_all())
self.root.bind("<Control-s>", lambda e: self._save_data())
self.root.bind("<Control-e>", lambda e: self._save_excel())
self._on_func_change()
def _load_all_config(self):
"""加载所有配置"""
# 加载别名
if os.path.exists("reg_aliases.json"):
try:
with open("reg_aliases.json", "r", encoding="utf-8") as f:
self.reg_aliases = json.load(f)
except:
self.reg_aliases = {}
# 加载报警
if os.path.exists("alarm_config.json"):
try:
with open("alarm_config.json", "r", encoding="utf-8") as f:
self.alarm_config = json.load(f)
except:
self.alarm_config = {}
# 加载历史配置
if os.path.exists("modbus_config.json"):
try:
with open("modbus_config.json", "r", encoding="utf-8") as f:
config = json.load(f)
if "baudrate" in config:
self.param_combos["波特率"].set(config["baudrate"])
if "slave_addr" in config:
self.slave_addr.set(config["slave_addr"])
if "last_func" in config:
self.func_code.set(config["last_func"])
if "slave_list" in config:
self.slave_list.set(config["slave_list"])
except:
pass
def _save_all_config(self):
"""保存所有配置"""
# 保存历史配置
config = {
"baudrate": self.param_combos["波特率"].get(),
"slave_addr": self.slave_addr.get(),
"last_func": self.func_code.get(),
"slave_list": self.slave_list.get()
}
with open("modbus_config.json", "w", encoding="utf-8") as f:
json.dump(config, f, ensure_ascii=False, indent=2)
# 保存别名
with open("reg_aliases.json", "w", encoding="utf-8") as f:
json.dump(self.reg_aliases, f, ensure_ascii=False, indent=2)
# 保存报警
with open("alarm_config.json", "w", encoding="utf-8") as f:
json.dump(self.alarm_config, f, ensure_ascii=False, indent=2)
# ========== 核心功能实现 ==========
def _detect_ports(self):
"""检测串口"""
try:
ports = serial.tools.list_ports.comports()
if not ports:
messagebox.showwarning("提示", "未检测到串口!")
return
self.port_combo['values'] = [f"{p.name} - {p.description[:20]}" for p in ports]
self.port_combo.current(0)
self.status_var.set(f"就绪:检测到{len(ports)}个串口 | 支持参数自动扫描")
except Exception as e:
messagebox.showerror("错误", f"检测串口失败:{str(e)}")
def _get_serial_params(self):
"""获取串口参数"""
parity_map = {"无校验":serial.PARITY_NONE, "奇校验":serial.PARITY_ODD, "偶校验":serial.PARITY_EVEN}
stop_map = {"1":serial.STOPBITS_ONE, "2":serial.STOPBITS_TWO}
return {
"baudrate": int(self.param_combos["波特率"].get()),
"bytesize": serial.EIGHTBITS,
"stopbits": stop_map[self.param_combos["停止位"].get()],
"parity": parity_map[self.param_combos["校验位"].get()],
"timeout": 0.1,
"write_timeout": 0.5
}
def _open_serial(self):
"""打开串口"""
if not self.port_combo.get():
messagebox.showwarning("提示", "请先检测并选择串口!")
return
params = f"{self.param_combos['波特率'].get()}/{self.param_combos['校验位'].get()}/{self.param_combos['停止位'].get()}/{self.param_combos['数据位'].get()}"
if not messagebox.askyesno("参数确认", f"{params}\n是否继续打开?"):
return
self._close_serial()
try:
port = self.port_combo.get().split(" - ")[0]
self.ser = serial.Serial(port=port, **self._get_serial_params())
if self.ser.is_open:
self._set_btn_state(tk.DISABLED)
for btn in ["sniff", "operate", "batch", "stop"]:
self.buttons[btn].config(state=tk.NORMAL)
self.buttons["excel"].config(state=tk.NORMAL)
self.status_var.set(f"就绪:串口已打开 | 参数:{params}")
# 标准化日志格式
self._log("info", f"【成功】打开串口:{port} | 参数:{params}\n")
except Exception as e:
messagebox.showerror("错误", f"串口打开失败:{str(e)}")
self._close_serial()
def _start_scan(self):
"""开始参数扫描"""
if not self.port_combo.get() or self.is_running:
return
self.is_running = True
self._clear_data()
self._set_btn_state(tk.DISABLED)
self.buttons["stop"].config(state=tk.NORMAL)
self.status_var.set("正在扫描:自动匹配合法03帧参数 | 包含波特率/校验位/停止位/数据位")
# 标准化日志格式
self._log("info", "【扫描开始】优先测试停止位=1的参数组合 | 精准显示寄存器数据\n")
threading.Thread(target=self._scan_func, daemon=True).start()
def _scan_func(self):
"""参数扫描逻辑"""
port = self.port_combo.get().split(" - ")[0]
found = False
for idx, (baud, stop, parity, data) in enumerate(self.scan_combs):
if not self.is_running:
break
param_str = f"{baud}/{parity}/{stop}/{data}"
# 标准化日志格式
self._log("info", f"【扫描{idx+1}/{len(self.scan_combs)}】{param_str}\n")
try:
self._close_serial()
self.ser = serial.Serial(port=port, baudrate=int(baud), parity=({"无校验":"N","奇校验":"O","偶校验":"E"})[parity],
stopbits=int(stop), bytesize=int(data), timeout=0.1)
if self.ser.is_open:
start = time.time()
while time.time() - start < 1.5 and self.is_running:
if self.ser.in_waiting > 0:
data = self.ser.read(self.ser.in_waiting)
if len(data) >= 8 and data[1] == 0x03:
crc = self._crc16(data[:-2])
if data[-2:] == crc:
found = True
break
time.sleep(0.1)
if found:
# 标准化日志格式
self._log("modbus", f"【找到有效参数】{param_str} | 捕获合法03帧!\n")
self.root.after(0, lambda: self.param_combos["波特率"].set(baud))
self.root.after(0, lambda: self.param_combos["停止位"].set(stop))
self.root.after(0, lambda: self.param_combos["校验位"].set(parity))
self.root.after(0, lambda: messagebox.showinfo("扫描成功", f"找到有效参数:\n{param_str}"))
break
except:
continue
finally:
self._close_serial()
self.is_running = False
self.root.after(0, self._stop_all)
if not found:
self._log("warn", "【扫描完成】未找到合法参数!\n")
self.root.after(0, lambda: messagebox.showinfo("提示", "扫描完成:未找到合法参数"))
def _start_sniff(self):
"""开始抓包"""
if not self.ser or not self.ser.is_open:
return
self.is_running = True
self._set_btn_state(tk.DISABLED)
self.buttons["stop"].config(state=tk.NORMAL)
param_str = f"{self.param_combos['波特率'].get()}/{self.param_combos['校验位'].get()}/{self.param_combos['停止位'].get()}/{self.param_combos['数据位'].get()}"
self.status_var.set(f"正在抓包 | 参数:{param_str}")
# 标准化日志格式
self._log("modbus", f"【抓包开始】精准解析03功能码响应帧,显示每个寄存器的地址和具体数值\n")
self._log("info", f"当前参数:{param_str}\n")
threading.Thread(target=self._sniff_func, daemon=True).start()
def _sniff_func(self):
"""抓包逻辑"""
buffer = bytearray()
while self.is_running and self.ser and self.ser.is_open:
try:
if self.ser.in_waiting > 0:
buffer.extend(self.ser.read(self.ser.in_waiting))
# 支持解析 03/07/11 功能码
while len(buffer) >= 5:
func_code = buffer[1]
# 03功能码帧长≥8,07/11功能码帧长≥5
min_len = 8 if func_code == 0x03 else 5
if len(buffer) < min_len:
break
if func_code not in [0x03, 0x07, 0x0B]: # 0x0B=11
del buffer[0]
continue
frame = buffer[:min_len]
crc = self._crc16(frame[:-2])
if frame[-2:] != crc:
del buffer[0]
continue
slave = frame[0]
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
hex_str = " ".join([f"{b:02X}" for b in frame])
if func_code == 0x03:
start = (frame[2] << 8) | frame[3]
count = (frame[4] << 8) | frame[5]
std_start = self._get_std_addr("03", start)
std_end = self._get_std_addr("03", start + count - 1)
# 标准化日志格式
self._log("modbus", f"[{now}] 合法帧 | 请求帧 | 从站{slave} | 功能码03 | 起始地址{start}(规范地址{std_start}) | 寄存器数量{count} | 原始数据:{hex_str}\n")
self._log("info", f"【寄存器地址范围】{std_start} ~ {std_end} (共{count}个寄存器)\n")
del buffer[:8]
resp_timeout = time.time() + 0.5
resp_captured = False
while time.time() < resp_timeout and self.is_running:
if self.ser.in_waiting > 0:
resp = self.ser.read(self.ser.in_waiting)
if len(resp) >= 5 and resp[0] == slave and resp[1] == 0x03:
reg_data = self._parse_reg_data("03", start, resp, count)
resp_hex = " ".join([f"{b:02X}" for b in resp])
self._log("reg_value", f"[{now}] 响应帧:{resp_hex}\n")
self._display_reg_data(reg_data, "03")
resp_captured = True
break
time.sleep(0.01)
if not resp_captured:
self._log("warn", f"【超时】未捕获到从站{slave}的响应帧\n")
elif func_code == 0x07:
self._log("modbus", f"[{now}] 合法帧 | 请求帧 | 从站{slave} | 功能码07 | 读取异常状态 | 原始数据:{hex_str}\n")
del buffer[:5]
elif func_code == 0x0B:
self._log("modbus", f"[{now}] 合法帧 | 请求帧 | 从站{slave} | 功能码11 | 获取通信事件计数器 | 原始数据:{hex_str}\n")
del buffer[:5]
except:
continue
time.sleep(0.001)
def _start_operate(self):
"""执行读写操作"""
if not self.ser or not self.ser.is_open:
return
try:
slave = int(self.slave_addr.get())
start = int(self.start_reg.get())
count = int(self.count.get())
func = self.func_code.get()
if slave < 1 or slave > 247 or count < 1 or count > self.func_codes[func]["max"]:
messagebox.showwarning("提示", "参数超出合法范围!")
return
if self.func_codes[func]["type"] in ["read", "diagnostic"] and float(self.read_interval.get()) < 0.1:
messagebox.showwarning("提示", "读取间隔不能小于0.1秒!")
return
except:
messagebox.showwarning("提示", "请输入有效的数值参数!")
return
self.is_running = True
self._set_btn_state(tk.DISABLED)
self.buttons["stop"].config(state=tk.NORMAL)
self.status_var.set(f"正在执行{self.func_codes[func]['name']}操作")
threading.Thread(target=self._operate_func, daemon=True).start()
def _operate_func(self):
"""操作执行逻辑"""
# 多从站处理
slave_list = [s.strip() for s in self.slave_list.get().split(",") if s.strip()]
if not slave_list:
slave_list = [self.slave_addr.get()]
while self.is_running and self.ser and self.ser.is_open:
try:
for slave_str in slave_list:
if not self.is_running:
break
slave = int(slave_str)
func = self.func_code.get()
start = int(self.start_reg.get())
count = int(self.count.get())
frame = bytearray([slave, int(func)])
# 07/08/11 功能码帧构造
if func in ["07", "11"]:
# 07/11功能码无需地址和数量参数
pass
elif func == "08":
# 08诊断功能码:默认子功能码0000
frame.extend([0x00, 0x00])
elif self.func_codes[func]["type"] == "read":
frame.extend([(start >> 8) & 0xFF, start & 0xFF, (count >> 8) & 0xFF, count & 0xFF])
elif self.func_codes[func]["type"] == "write_single":
if func == "05":
val = int(self.write_val.get())
frame.extend([(start >> 8) & 0xFF, start & 0xFF, 0xFF if val else 0x00, 0x00])
else:
val = int(self.write_val.get())
frame.extend([(start >> 8) & 0xFF, start & 0xFF, (val >> 8) & 0xFF, val & 0xFF])
elif self.func_codes[func]["type"] == "write_multi":
vals = [int(v) for v in self.write_vals.get().split(",")]
byte_count = len(vals) * 2 if func == "16" else (len(vals) + 7) // 8
frame.extend([(start >> 8) & 0xFF, start & 0xFF, (count >> 8) & 0xFF, count & 0xFF, byte_count])
if func == "15":
data = bytearray()
for i in range(len(vals)):
if i % 8 == 0:
data.append(0)
if vals[i]:
data[-1] |= (1 << (i % 8))
frame.extend(data)
else:
for val in vals:
frame.extend([(val >> 8) & 0xFF, val & 0xFF])
frame.extend(self._crc16(frame))
self.ser.write(frame)
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
hex_str = " ".join([f"{b:02X}" for b in frame])
std_start = self._get_std_addr(func, start)
std_end = self._get_std_addr(func, start + count - 1)
# 标准化日志格式
self._log("info", f"[{now}] 操作帧 | 从站{slave} | 功能码{func} | 地址范围:{std_start} ~ {std_end} | 寄存器数量{count} | 原始数据:{hex_str}\n")
# 解析响应
if func in ["03", "07", "11"]:
time.sleep(0.1)
if self.ser.in_waiting > 0:
resp = self.ser.read(self.ser.in_waiting)
reg_data = self._parse_reg_data(func, start, resp, count)
self._display_reg_data(reg_data, func)
# 操作完成日志
self._log("info", f"【操作完成】{self.func_codes[func]['name']}执行完毕\n")
time.sleep(float(self.read_interval.get())/len(slave_list))
except:
break
self._stop_all()
def _start_batch(self):
"""开始批量读取"""
if not self.ser or not self.ser.is_open:
return
try:
if self.batch_all.get():
batch_list = [(f, 0, 10) if f in ["01","02","03","04"] else (f, 0, 1) for f in self.read_funcs]
else:
batch_list = []
for item in self.batch_params.get().split(";"):
if not item:
continue
func, addr_count = item.split(":")
start, count = addr_count.split(",")
batch_list.append((func.strip(), int(start), int(count)))
if not batch_list or float(self.batch_interval.get()) < 0.1:
raise ValueError("参数不合法")
except:
messagebox.showwarning("提示", "批量读取参数格式错误!")
return
self.is_running = True
self._set_btn_state(tk.DISABLED)
self.buttons["stop"].config(state=tk.NORMAL)
self.status_var.set(f"正在批量读取 | 共{len(batch_list)}条数据")
# 标准化日志格式
func_codes_str = "/".join(self.read_funcs[:4]) # 只显示前4个主要功能码
self._log("info", f"【批量读取开始】自动读取所有读功能码({func_codes_str}) | 读取间隔{self.batch_interval.get()}秒\n\n")
threading.Thread(target=self._batch_func, args=(batch_list,), daemon=True).start()
def _batch_func(self, batch_list):
"""批量读取逻辑"""
interval = float(self.batch_interval.get())
# 多从站处理
slave_list = [s.strip() for s in self.slave_list.get().split(",") if s.strip()]
if not slave_list:
slave_list = [self.slave_addr.get()]
for slave_str in slave_list:
if not self.is_running:
break
slave = int(slave_str)
self._log("info", f"【批量读取从站{slave}】开始读取数据\n")
for idx, (func, start, count) in enumerate(batch_list):
if not self.is_running:
break
try:
frame = bytearray([slave, int(func)])
# 07/11 功能码帧构造
if func in ["07", "11"]:
pass
else:
frame.extend([(start >> 8) & 0xFF, start & 0xFF, (count >> 8) & 0xFF, count & 0xFF])
frame.extend(self._crc16(frame))
self.ser.write(frame)
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
hex_str = " ".join([f"{b:02X}" for b in frame])
# 标准化日志格式
self._log("info", f"【批量读取 {idx+1}/{len(batch_list)}】{now} | {func}({self.func_codes[func]['name']}) | 起始地址{start} | 数量{count}\n")
self._log("info", f"发送帧:{hex_str}\n")
time.sleep(0.1)
if self.ser.in_waiting > 0:
resp = self.ser.read(self.ser.in_waiting)
resp_hex = " ".join([f"{b:02X}" for b in resp])
self._log("info", f"响应帧:{resp_hex}\n")
reg_data = self._parse_reg_data(func, start, resp, count)
self._display_reg_data(reg_data, func)
if idx < len(batch_list) - 1:
time.sleep(interval)
except Exception as e:
self._log("error", f"【批量读取错误】从站{slave} {func}功能码读取失败:{str(e)}\n")
# 标准化日志格式
self._log("info", "\n【批量读取完成】所有功能码数据读取完毕!\n")
self._stop_all()
# ========== 工具方法 ==========
def _crc16(self, data):
"""CRC16计算"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
crc = (crc >> 1) ^ 0xA001 if crc & 0x0001 else crc >> 1
return bytes([crc & 0xFF, (crc >> 8) & 0xFF])
def _get_std_addr(self, func, raw_addr):
"""标准地址转换"""
cfg = self.func_codes[func]
std_addr = cfg["base"] + raw_addr
return f"{cfg['prefix']}{std_addr}" if cfg['prefix'] else f"{std_addr}"
def _parse_reg_data(self, func, start, frame, count):
"""解析寄存器数据"""
reg_data = []
try:
func_int = int(func)
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
raw_hex = " ".join([f"{b:02X}" for b in frame])
# 3/4功能码解析
if func_int in [3, 4]:
data = frame[3:-2] if len(frame) > 5 else frame
for i in range(min(count, len(data)//2)):
addr = self._get_std_addr(func, start + i)
val = (data[i*2] << 8) | data[i*2+1]
reg_data.append((addr, val))
# 结构化数据
self.structured_data.append({
"time": now,
"slave": frame[0] if len(frame) > 0 else "",
"func": func,
"addr": addr,
"value": val,
"raw_hex": raw_hex
})
# 更新绘图
if self.start_plot_time and addr == self.plot_data["addr"]:
self.plot_data["x"].append(time.time() - self.start_plot_time)
self.plot_data["y"].append(val)
# 检查报警
self._check_alarm(addr, val)
# 1/2功能码解析
elif func_int in [1, 2]:
data = frame[1:] if len(frame) > 1 else frame
for i in range(min(count, len(data)*8)):
addr = self._get_std_addr(func, start + i)
val = (data[i//8] >> (i%8)) & 0x01
reg_data.append((addr, val))
# 结构化数据
self.structured_data.append({
"time": now,
"slave": frame[0] if len(frame) > 0 else "",
"func": func,
"addr": addr,
"value": val,
"raw_hex": raw_hex
})
# 检查报警
self._check_alarm(addr, val)
# 07功能码解析(读取异常状态)
elif func_int == 7:
if len(frame) >= 5:
status = frame[2]
status_desc = self._get_exception_status_desc(status)
reg_data.append(("异常状态码", status))
reg_data.append(("状态描述", status_desc))
self.structured_data.append({
"time": now,
"slave": frame[0],
"func": func,
"addr": "异常状态",
"value": f"{status} ({status_desc})",
"raw_hex": raw_hex
})
# 11功能码解析(获取通信事件计数器)
elif func_int == 11:
if len(frame) >= 7:
event_count = (frame[2] << 8) | frame[3]
event_status = (frame[4] << 8) | frame[5]
reg_data.append(("通信事件计数", event_count))
reg_data.append(("通信事件状态", event_status))
self.structured_data.append({
"time": now,
"slave": frame[0],
"func": func,
"addr": "通信事件计数",
"value": event_count,
"raw_hex": raw_hex
})
self.structured_data.append({
"time": now,
"slave": frame[0],
"func": func,
"addr": "通信事件状态",
"value": event_status,
"raw_hex": raw_hex
})
# 08功能码解析(诊断功能)
elif func_int == 8:
if len(frame) >= 7:
sub_func = (frame[2] << 8) | frame[3]
diag_data = (frame[4] << 8) | frame[5]
reg_data.append((f"诊断子功能码({sub_func})", diag_data))
self.structured_data.append({
"time": now,
"slave": frame[0],
"func": func,
"addr": f"诊断子功能{sub_func}",
"value": diag_data,
"raw_hex": raw_hex
})
except:
pass
return reg_data
def _get_exception_status_desc(self, status):
"""07功能码异常状态码描述"""
status_map = {
0x00: "无异常",
0x01: "奇偶校验错误",
0x02: "帧错误",
0x04: "溢出错误",
0x08: "校验和错误",
0x10: "地址错误",
0x20: "功能码不支持",
0x40: "寄存器不存在",
0x80: "其他错误"
}
desc = []
for code, msg in status_map.items():
if status & code:
desc.append(msg)
return " | ".join(desc) if desc else "未知状态"
def _display_reg_data(self, reg_data, func):
"""显示寄存器数据"""
if not reg_data:
return
self._log("info", f"\n【{self.func_codes[func]['name']} - 详细数据】\n")
for addr, val in reg_data:
alias = self.reg_aliases.get(addr, addr)
self._log("reg_pair", f"【寄存器数据】{alias}={val}\n")
self._log("info", f"\n【统计】共解析 {len(reg_data)} 个寄存器数据\n")
def _log(self, tag, content):
"""日志记录"""
try:
self.data_queue.put((tag, content), timeout=0.1)
self.captured_data.append(content)
except:
pass
def _update_display(self):
"""更新日志显示"""
try:
for _ in range(min(10, self.data_queue.qsize())):
tag, content = self.data_queue.get_nowait()
self.log_text.config(state=tk.NORMAL)
self.log_text.insert(tk.END, content, tag)
self.log_text.see(tk.END)
self.log_text.config(state=tk.DISABLED)
except:
pass
self.root.after(50, self._update_display)
def _set_btn_state(self, state):
"""设置按钮状态"""
for name, btn in self.buttons.items():
if name != "stop":
btn.config(state=state)
self.buttons["stop"].config(state=tk.NORMAL if state == tk.DISABLED else tk.DISABLED)
def _close_serial(self):
"""关闭串口"""
if self.ser and self.ser.is_open:
try:
self.ser.close()
except:
pass
self.ser = None
def _stop_all(self):
"""停止所有操作"""
self.is_running = False
self._close_serial()
self._set_btn_state(tk.NORMAL)
for btn in ["sniff", "operate", "batch"]:
self.buttons[btn].config(state=tk.DISABLED)
self.buttons["save"].config(state=tk.NORMAL if self.captured_data else tk.DISABLED)
self.buttons["excel"].config(state=tk.NORMAL if self.structured_data else tk.DISABLED)
param_str = f"{self.param_combos['波特率'].get()}/{self.param_combos['校验位'].get()}/{self.param_combos['停止位'].get()}/{self.param_combos['数据位'].get()}"
self.status_var.set(f"就绪:已停止 | 参数:{param_str}")
def _clear_data(self):
"""清空数据"""
self.log_text.config(state=tk.NORMAL)
self.log_text.delete(1.0, tk.END)
self.log_text.config(state=tk.DISABLED)
self.captured_data.clear()
self.structured_data.clear()
self.buttons["save"].config(state=tk.DISABLED)
self.buttons["excel"].config(state=tk.DISABLED)
self.status_var.set("就绪:数据已清空")
def _save_data(self):
"""保存日志"""
if not self.captured_data:
messagebox.showwarning("提示", "无数据可保存!")
return
path = filedialog.asksaveasfilename(defaultextension=".txt",
initialfile=f"Modbus_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt",
filetypes=[("文本文件", "*.txt")])
if not path:
return
try:
with open(path, "w", encoding="utf-8") as f:
f.write(f"Modbus数据 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("="*80 + "\n")
param_str = f"{self.param_combos['波特率'].get()}/{self.param_combos['校验位'].get()}/{self.param_combos['停止位'].get()}/{self.param_combos['数据位'].get()}"
f.write(f"串口参数:{param_str}\n")
f.write("="*80 + "\n\n")
f.writelines(self.captured_data)
messagebox.showinfo("保存成功", f"数据已保存到:\n{path}")
except:
messagebox.showerror("保存失败", "数据保存出错!")
def _on_func_change(self, *args):
"""功能码变更处理"""
func = self.func_code.get()
cfg = self.func_codes[func]
self.func_desc.config(text=f"({cfg['name']})")
self.addr_desc.config(text=f"(原始地址0→{cfg['prefix']}{cfg['base']})")
self.count_desc.config(text=f"(最多{cfg['max']})")
for frame in [self.interval_frame, self.single_frame, self.multi_frame]:
frame.pack_forget()
if cfg["type"] in ["read", "diagnostic"]:
self.interval_frame.pack(side=tk.LEFT)
elif cfg["type"] == "write_single":
self.single_frame.pack(side=tk.LEFT)
self.single_hint.config(text="(0=断开/1=闭合)" if func == "05" else "(16位数值)")
elif cfg["type"] == "write_multi":
self.multi_frame.pack(side=tk.LEFT)
# ========== 扩展功能 ==========
def _edit_aliases(self):
"""编辑寄存器别名"""
alias_win = tk.Toplevel(self.root)
alias_win.title("寄存器别名配置")
alias_win.geometry("400x300")
alias_win.transient(self.root)
list_frame = ttk.Frame(alias_win)
list_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
ttk.Label(list_frame, text="寄存器地址").grid(row=0, column=0, padx=5, pady=5)
ttk.Label(list_frame, text="别名").grid(row=0, column=1, padx=5, pady=5)
# 现有别名显示
rows = []
for addr, alias in self.reg_aliases.items():
row = len(rows) + 1
ttk.Entry(list_frame, width=15).grid(row=row, column=0, padx=5, pady=5)
ttk.Entry(list_frame, width=20).grid(row=row, column=1, padx=5, pady=5)
list_frame.grid_slaves(row=row, column=0)[0].insert(0, addr)
list_frame.grid_slaves(row=row, column=1)[0].insert(0, alias)
rows.append(row)
# 新增行
row = len(rows) + 1
ttk.Entry(list_frame, width=15).grid(row=row, column=0, padx=5, pady=5)
ttk.Entry(list_frame, width=20).grid(row=row, column=1, padx=5, pady=5)
# 保存按钮
def save_aliases():
new_aliases = {}
for r in range(1, len(rows)+2):
addr = list_frame.grid_slaves(row=r, column=0)[0].get().strip()
alias = list_frame.grid_slaves(row=r, column=1)[0].get().strip()
if addr and alias:
new_aliases[addr] = alias
self.reg_aliases = new_aliases
self._save_all_config()
alias_win.destroy()
messagebox.showinfo("成功", "别名配置已保存!")
ttk.Button(alias_win, text="保存", command=save_aliases).pack(pady=10)
def _edit_alarm(self):
"""编辑报警配置"""
alarm_win = tk.Toplevel(self.root)
alarm_win.title("报警配置")
alarm_win.geometry("400x300")
alarm_win.transient(self.root)
ttk.Label(alarm_win, text="寄存器地址:").grid(row=0, column=0, padx=5, pady=5, sticky="e")
alarm_addr = ttk.Entry(alarm_win, width=15)
alarm_addr.grid(row=0, column=1, padx=5, pady=5)
ttk.Label(alarm_win, text="最小值:").grid(row=1, column=0, padx=5, pady=5, sticky="e")
alarm_min = ttk.Entry(alarm_win, width=15)
alarm_min.grid(row=1, column=1, padx=5, pady=5)
ttk.Label(alarm_win, text="最大值:").grid(row=2, column=0, padx=5, pady=5, sticky="e")
alarm_max = ttk.Entry(alarm_win, width=15)
alarm_max.grid(row=2, column=1, padx=5, pady=5)
# 加载现有配置
if self.alarm_config:
addr = next(iter(self.alarm_config.keys()))
alarm_addr.insert(0, addr)
alarm_min.insert(0, self.alarm_config[addr]["min"])
alarm_max.insert(0, self.alarm_config[addr]["max"])
# 保存按钮
def save_alarm():
addr = alarm_addr.get().strip()
try:
min_val = float(alarm_min.get())
max_val = float(alarm_max.get())
if not addr:
raise ValueError("地址不能为空")
self.alarm_config = {
addr: {"min": min_val, "max": max_val, "alarm": True}
}
self._save_all_config()
alarm_win.destroy()
messagebox.showinfo("成功", "报警配置已保存!")
except:
messagebox.showerror("错误", "请输入有效的数值!")
ttk.Button(alarm_win, text="保存", command=save_alarm).grid(row=3, column=1, pady=10)
def _check_alarm(self, addr, val):
"""检查报警"""
if addr in self.alarm_config and self.alarm_config[addr]["alarm"]:
cfg = self.alarm_config[addr]
try:
val = float(val)
if val < cfg["min"] or val > cfg["max"]:
alarm_msg = f"【报警】{self.reg_aliases.get(addr, addr)}数值{val}超出阈值({cfg['min']}-{cfg['max']})!\n"
self._log("alarm", alarm_msg)
# 弹窗+蜂鸣
threading.Thread(target=lambda: messagebox.showerror("报警", f"{self.reg_aliases.get(addr, addr)}\n数值异常:{val}\n阈值:{cfg['min']}-{cfg['max']}"), daemon=True).start()
winsound.Beep(1000, 1000)
except:
pass
def _start_plot(self):
"""开始绘图"""
self.plot_data["addr"] = self.plot_reg.get().strip()
self.plot_data["x"] = []
self.plot_data["y"] = []
self.start_plot_time = time.time()
self._update_plot()
def _update_plot(self):
"""更新绘图"""
if self.plot_data["x"] and self.plot_data["y"]:
self.line.set_data(self.plot_data["x"], self.plot_data["y"])
self.ax.relim()
self.ax.autoscale_view()
self.canvas.draw()
self.root.after(1000, self._update_plot)
def _clear_plot(self):
"""清空绘图"""
self.plot_data["x"] = []
self.plot_data["y"] = []
self.line.set_data([], [])
self.ax.relim()
self.ax.autoscale_view()
self.canvas.draw()
def _save_excel(self):
"""导出Excel"""
if not self.structured_data:
messagebox.showwarning("提示", "无结构化数据可导出!")
return
path = filedialog.asksaveasfilename(
defaultextension=".xlsx",
initialfile=f"Modbus_Excel_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx",
filetypes=[("Excel文件", "*.xlsx")]
)
if not path:
return
try:
wb = Workbook()
ws = wb.active
ws.title = "Modbus数据"
# 表头
ws.append(["时间", "从站地址", "功能码", "寄存器地址", "别名", "数值", "原始十六进制数据"])
# 写入数据
for data in self.structured_data:
ws.append([
data["time"],
data["slave"],
data["func"],
data["addr"],
self.reg_aliases.get(data["addr"], ""),
data["value"],
data["raw_hex"]
])
wb.save(path)
messagebox.showinfo("成功", f"Excel已导出:\n{path}")
except Exception as e:
messagebox.showerror("错误", f"导出失败:{str(e)}")
def _on_close(self):
"""窗口关闭处理"""
self._stop_all()
self._save_all_config()
if self.captured_data and messagebox.askyesno("提示", "是否保存当前数据?"):
self._save_data()
self.root.destroy()
sys.exit(0)
# ========== 程序入口 ==========
if __name__ == "__main__":
try:
from ctypes import windll
windll.shcore.SetProcessDpiAwareness(1)
except:
pass
# 检查依赖
missing_libs = []
try:
import serial
except ImportError:
missing_libs.append("pyserial")
try:
from openpyxl import Workbook
except ImportError:
missing_libs.append("openpyxl")
try:
import matplotlib
except ImportError:
missing_libs.append("matplotlib")
if missing_libs and messagebox.askyesno("缺少依赖", f"检测到缺少以下库:{', '.join(missing_libs)}\n是否自动安装?"):
import subprocess
for lib in missing_libs:
subprocess.check_call([sys.executable, "-m", "pip", "install", lib])
messagebox.showinfo("成功", "依赖库安装完成,请重启程序!")
sys.exit(0)
root = tk.Tk()
app = ModbusRTUTool(root)
root.mainloop()
|
-
-
免费评分
-
查看全部评分
|