吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 1266|回复: 15
上一主题 下一主题
收起左侧

[Python 原创] Modbus-RTU 通信工具 V3.1

  [复制链接]
跳转到指定楼层
楼主
sty19890218 发表于 2026-1-20 15:15 回帖奖励
本帖最后由 sty19890218 于 2026-1-20 15:35 编辑

Modbus-RTU 通信工具(V3.1)详细使用说明书
文档说明
本文档为 Modbus-RTU 通信工具(V3.1)的全流程操作指南,覆盖从软件启动到高级功能使用的每一步操作,适配零基础用户快速上手,同时满足工业调试场景的专业需求。
目录
1.工具基础信息
2.环境部署与启动
3.串口配置(逐步骤)
4.串口参数自动扫描(逐步骤)
5.核心功能操作(逐步骤)
6.数据可视化操作
7.扩展功能(别名 / 报警)
8.快捷键与故障排查
9.配置文件说明

  1. 工具基础信息
    1.1 功能清单
    功能模块        核心能力
    串口管理        自动检测串口、参数配置、多从站支持
    参数自动扫描        遍历 48 组参数组合,自动匹配目标设备的波特率 / 校验位 / 停止位
    数据抓包        精准解析 03/07/11 功能码帧,显示寄存器地址 + 数值
    读写操作        支持 01/02/03/04/05/06/07/08/11/15/16 功能码的读 / 写 / 诊断操作
    批量读取        自定义多功能码批量读取,支持自动遍历所有读功能码
    数据可视化        实时绘制单个寄存器数值曲线
    数据导出        通信日志导出 TXT、结构化数据导出 Excel
    扩展功能        寄存器别名配置、数值超限报警(蜂鸣 + 日志高亮)
    1.2 功能码规范(必看)
    功能码        名称        操作类型        原始地址→规范地址示例        最大操作数量
    01        读线圈寄存器        读        0→00001        2000
    02        读离散输入寄存器        读        0→00001        2000
    03        读保持寄存器        读        0→40001        125
    04        读输入寄存器        读        0→30001        125
    05        写单个线圈        写(单)        0→00001        1
    06        写单个保持寄存器        写(单)        0→40001        1
    07        读取异常状态        读        无规范地址        1
    08        诊断        诊断        无规范地址        1
    11        获取通信事件计数器        读        无规范地址        1
    15        写多个线圈        写(多)        0→00001        1968
    16        写多个保持寄存器        写(多)        0→40001        123
  2. 环境部署与启动
    步骤 1:安装 Python 环境
    1.下载 Python 3.7 及以上版本(推荐 3.9):https://www.python.org/downloads/
    2.安装时勾选「Add Python to PATH」,点击「Install Now」完成安装。
    步骤 2:安装依赖库
    1.按下「Win+R」,输入「cmd」打开命令提示符;
    2.依次执行以下命令(复制后粘贴回车):
    bash
    运行
    pip install pyserial openpyxl matplotlib
    注:tkinter 为 Python 自带库,若提示缺失,执行pip install tkinter补充安装。
    步骤 3:启动工具
    1.将「485 抓包 V3.1.py」文件保存至本地(如桌面);
    2.双击该文件,或在命令提示符中执行:
    bash
    运行
    python 桌面\485抓包V3.1.py
    1.启动成功后,界面显示「Modbus-RTU 工具(V3.1)」标题,状态栏提示「就绪:未打开串口」。
  3. 串口配置(逐步骤)
    步骤 1:检测串口
    1.找到界面「串口配置」区域(左上角);
    2.点击「检测串口」按钮;
    3.若连接串口设备(如 485 转换器、PLC),串口下拉框会显示「COMx - 设备描述」(例:COM3 - USB-SERIAL CH340);
    4.若提示「未检测到串口」,检查:
    1.串口设备是否正确连接电脑;
    2.设备驱动是否安装(如 CH340、PL2303 驱动);
    3.串口是否被其他软件占用(如串口助手、PLC 编程软件)。
    步骤 2:选择串口参数
    参数项        操作步骤
    串口        点击串口下拉框,选择检测到的串口(如 COM3)
    波特率        点击波特率下拉框,选择目标值(常用 9600、19200、115200)
    校验位        点击校验位下拉框,选择「无校验」「奇校验」「偶校验」(默认无校验)
    停止位        点击停止位下拉框,选择「1」或「2」(默认 1)
    数据位        固定为 8 位,无需配置
    多从站地址        如需批量操作多从站,在输入框填写逗号分隔的地址(如 1,2,3),默认仅 1
    步骤 3:打开串口
    1.确认参数后,点击「打开串口」按钮;
    2.弹出参数确认弹窗(例:9600 / 无校验 / 1/8),点击「是」;
    3.成功后状态栏显示「就绪:串口已打开 | 参数:9600 / 无校验 / 1/8」,且「开始抓包」「执行操作」等按钮从灰色变为可点击;
    4.若失败,弹窗提示错误原因(如「串口被占用」「参数错误」),需排查后重新操作。
  4. 串口参数自动扫描(逐步骤)
    适用场景:未知目标设备的串口参数,需自动匹配
    步骤 1:准备工作
    1.确保串口设备已连接,且完成「检测串口」并选择目标串口;
    2.关闭其他占用该串口的软件(避免扫描失败)。
    步骤 2:启动扫描
    1.点击「扫描参数」按钮;
    2.状态栏提示「正在扫描:自动匹配合法 03 帧参数」,日志区域显示「【扫描开始】优先测试停止位 = 1 的参数组合」;
    3.工具自动遍历 48 组参数组合,每扫描一组,日志显示「【扫描 X/48】波特率 / 校验位 / 停止位 / 数据位」。
    步骤 3:扫描结果处理
    1.若找到有效参数:
    1.日志显示「【找到有效参数】9600 / 无校验 / 1/8 | 捕获合法 03 帧!」;
    2.自动填充波特率 / 校验位 / 停止位参数,并弹窗提示「找到有效参数」;
    3.点击「确定」后,可直接点击「打开串口」使用匹配的参数。
    2.若未找到有效参数:
    1.日志显示「【扫描完成】未找到合法参数!」;
    2.弹窗提示「扫描完成:未找到合法参数」;
    3.排查:确认设备是否上电、485 接线是否正确、设备是否支持 Modbus-RTU 03 功能码。
    步骤 4:停止扫描(可选)
    扫描过程中如需终止,点击「停止操作」按钮,工具立即停止扫描并恢复初始状态。
  5. 核心功能操作(逐步骤)
    5.1 数据抓包(实时解析 03/07/11 功能码)
    前提:已成功打开串口
    步骤 1:启动抓包
    1.点击「开始抓包」按钮;
    2.状态栏提示「正在抓包 | 参数:9600 / 无校验 / 1/8」,日志区域显示「【抓包开始】精准解析 03 功能码响应帧」。
    步骤 2:解析数据查看
    1.当串口有 03 功能码数据传输时,日志区域显示:
    1.请求帧:[2024-05-20 10:00:00.000] 合法帧 | 请求帧 | 从站1 | 功能码03 | 起始地址0(规范地址40001) | 寄存器数量10 | 原始数据:01 03 00 00 00 0A C4 0B
    2.响应帧:[2024-05-20 10:00:00.100] 响应帧:01 03 14 00 01 00 02 00 03 ...
    3.寄存器数值:40001: 1 | 40002: 2 | 40003: 3 ...(深蓝色粗体)
    2.若未捕获响应帧,日志显示「【超时】未捕获到从站 1 的响应帧」(橙色粗体)。
    步骤 3:停止抓包
    点击「停止操作」按钮,状态栏恢复「就绪:串口已打开」,抓包停止。
    5.2 单功能码读写操作
    前提:已打开串口,以 03 功能码(读保持寄存器)为例
    步骤 1:配置操作参数
    1.找到「读写操作参数」区域:
    1.功能码:下拉框选择「03」;
    2.从站地址:输入目标从站地址(如 1,范围 1-247);
    3.起始地址:输入原始地址(如 0,对应规范地址 40001);
    4.数量:输入读取数量(如 10,≤125);
    5.读取间隔:输入 0.5(单位秒,≥0.1);
    2.若为写操作(如 06 功能码):
    1.功能码选择「06」;
    2.起始地址输入 0(对应 40001);
    3.写入值:输入要写入的数值(如 100);
    4.读取间隔无需配置(写操作无间隔)。
    步骤 2:执行操作
    1.点击「执行操作」按钮;
    2.若参数非法(如地址>247、数量>125),弹窗提示「参数超出合法范围」;
    3.合法参数则启动操作,日志显示:
    1.操作帧:[2024-05-20 10:10:00.000] 操作帧 | 从站1 | 功能码03 | 地址范围:40001 ~ 40010 | 寄存器数量10 | 原始数据:01 03 00 00 00 0A C4 0B
    2.寄存器数值:实时显示每个寄存器的地址和数值。
    步骤 3:停止操作
    点击「停止操作」按钮,终止读写循环。
    5.3 批量读取(多功能码自动遍历)
    前提:已打开串口
    步骤 1:配置批量参数
    1.找到「批量读取参数」区域:
    1.读取参数:默认值「01:0,10;02:0,10;03:0,10;04:0,10;07:0,1;11:0,1」,格式为「功能码:起始地址,数量;...」;
    2.读取间隔:输入 0.2(单位秒,≥0.1);
    3.勾选「自动读取所有读功能码」(默认勾选,自动遍历 01/02/03/04/07/11)。
    步骤 2:启动批量读取
    1.点击「批量读取」按钮;
    2.状态栏提示「正在执行批量读取操作」,日志按顺序显示每个功能码的读取结果:
    1.[2024-05-20 10:20:00.000] 操作帧 | 从站1 | 功能码01 | 地址范围:00001 ~ 00010 | 寄存器数量10 | 原始数据:01 01 00 00 00 0A FC 08
    2.[2024-05-20 10:20:00.200] 操作帧 | 从站1 | 功能码02 | 地址范围:00001 ~ 00010 | 寄存器数量10 | 原始数据:01 02 00 00 00 0A 7C 08
    步骤 3:停止批量读取
    点击「停止操作」按钮,终止批量读取循环。
    5.4 数据导出
    方式 1:保存通信日志(TXT)
    1.确保串口已打开(未打开则按钮灰色);
    2.点击「保存日志 (TXT)」按钮;
    3.弹出文件保存对话框,选择保存路径(如桌面),输入文件名(如「通信日志 20240520.txt」);
    4.点击「保存」,日志文件包含所有通信记录(时间戳、帧数据、寄存器数值)。
    方式 2:导出 Excel(结构化数据)
    1.执行过读写 / 批量读取操作后,点击「导出 Excel」按钮;
    2.选择保存路径,输入文件名(如「寄存器数据 20240520.xlsx」);
    3.点击「保存」,Excel 文件包含列:时间戳、从站地址、功能码、寄存器地址、数值、别名(若配置)。
  6. 数据可视化操作
    前提:已打开串口,且目标寄存器有数据读取
    步骤 1:配置监控寄存器
    1.切换到「数据可视化」标签页;
    2.在「监控寄存器」输入框填写规范地址(如 40001,对应 03 功能码原始地址 0);
    3.确认输入的地址已在「主功能」标签页中执行过读取操作(否则无数据)。
    步骤 2:启动实时监控
    1.点击「开始监控」按钮;
    2.绘图区域自动绘制数值曲线:
    1.X 轴:时间(秒,从监控开始计时);
    2.Y 轴:寄存器数值;
    3.曲线:实时刷新,蓝色实线显示数值变化。
    步骤 3:清空曲线
    点击「清空曲线」按钮,绘图区域重置,停止当前监控(需重新点击「开始监控」恢复)。
  7. 扩展功能(别名 / 报警)
    7.1 寄存器别名配置
    作用:将规范地址(如 40001)命名为易理解的名称(如「温度传感器」),日志中同步显示
    步骤 1:打开别名配置
    1.点击「寄存器别名」按钮;
    2.弹出配置窗口(首次打开为空)。
    步骤 2:添加 / 编辑别名
    1.输入规范地址(如 40001);
    2.输入别名(如「温度传感器 - 车间 1」);
    3.点击「添加」,别名立即生效;
    4.如需修改,选中已添加的别名,修改后点击「编辑」;
    5.如需删除,选中后点击「删除」。
    步骤 3:保存配置
    配置完成后点击「保存」,别名数据自动保存至「reg_aliases.json」文件(工具同目录),重启工具后仍生效。
    7.2 报警配置
    作用:设置寄存器数值阈值,超出时触发蜂鸣 + 日志高亮(红色粗体)
    步骤 1:打开报警配置
    1.点击「报警配置」按钮;
    2.弹出配置窗口。
    步骤 2:添加报警规则
    1.输入规范地址(如 40001);
    2.选择报警类型:「大于」「小于」「等于」;
    3.输入阈值(如 80);
    4.点击「添加」,规则立即生效。
    步骤 3:触发报警
    当寄存器数值满足报警规则(如 40001 数值 = 85>80):
    1.电脑发出蜂鸣音;
    2.日志显示「【报警】40001(温度传感器):85 > 80」(红色粗体)。
    步骤 4:管理规则
    选中已添加的规则,可点击「编辑」修改或「删除」移除,点击「保存」后规则保存至「alarm_config.json」文件。
  8. 快捷键与故障排查
    8.1 快捷键清单
    快捷键        功能        适用场景
    F5        打开串口        已检测并选择串口
    F6        开始抓包        串口已打开
    F7        停止操作        抓包 / 读写 / 批量读取中
    Ctrl+S        保存日志(TXT)        串口已打开
    Ctrl+E        导出 Excel        执行过读写操作
    8.2 常见故障排查
    故障现象        排查步骤
    检测不到串口        1. 检查设备接线;2. 重装串口驱动;3. 以管理员身份运行工具
    打开串口提示「被占用」        1. 关闭其他串口软件(如串口助手);2. 重启电脑释放串口占用
    抓包无数据        1. 确认串口参数匹配;2. 检查 485 接线(A/B 线是否接反);3. 设备是否上电
    读写操作提示参数非法        1. 从站地址是否在 1-247;2. 数量是否≤功能码最大限制;3. 读取间隔≥0.1 秒
    导出 Excel 为空        1. 确认执行过读写 / 批量读取操作;2. 检查是否有结构化数据生成
    数据可视化无曲线        1. 确认监控寄存器地址正确;2. 确认该地址有数据读取;3. 重启工具重试
  9. 配置文件说明
    工具运行目录下自动生成 3 个配置文件,删除后恢复默认:
    文件名        作用
    modbus_config.json        保存历史串口参数、从站地址、最后使用的功能码
    reg_aliases.json        保存寄存器别名配置
    alarm_config.json        保存报警规则配置










[Python] 纯文本查看 复制代码
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox, filedialog
import serial
import serial.tools.list_ports
import threading
import time
from datetime import datetime
import queue
import sys
import json
import os
import winsound  # 报警蜂鸣
from openpyxl import Workbook  # Excel导出
import matplotlib.pyplot as plt  # 可视化
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg

# ========== 修复中文字体显示 ==========
import matplotlib
# 设置matplotlib支持中文(这部分保留,无问题)
matplotlib.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans']  # 优先使用系统中文字体
matplotlib.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题

# 定义字体配置函数(保留,但暂时不执行)
def set_tkinter_font(root):  # 新增参数:传入主窗口对象
    default_font = tk.font.nametofont("TkDefaultFont", root=root)  # 指定root
    default_font.configure(family="SimHei", size=9)
    text_font = tk.font.nametofont("TkTextFont", root=root)
    text_font.configure(family="SimHei", size=10)
    fixed_font = tk.font.nametofont("TkFixedFont", root=root)
    fixed_font.configure(family="Consolas", size=10)

# ========== 主类定义 ==========
class ModbusRTUTool:
    def __init__(self, root):
        # 基础配置
        self.root = root
        self.root.title("Modbus-RTU工具 (V3.1)")
        self.root.geometry("1600x1000")
        self.root.resizable(True, True)
        self.root.configure(bg="#f0f8ff")
        
        # ========== 关键修复:在这里执行字体配置(主窗口已创建) ==========
        set_tkinter_font(self.root)
        
        # 核心状态变量
        self.status_var = tk.StringVar(value="就绪:未打开串口 | 支持参数自动扫描 | 精准解析多功能码 | 支持批量读取所有功能码")
        self.ser = None
        self.is_running = False  # 统一运行状态标识
        self.data_queue = queue.Queue(maxsize=1000)
        self.captured_data = []
        
        # 扩展功能变量
        self.structured_data = []  # 结构化数据(用于Excel导出)
        self.reg_aliases = {}      # 寄存器别名
        self.alarm_config = {}     # 报警配置
        self.slave_list = tk.StringVar(value="1")  # 多从站列表
        self.plot_data = {"x": [], "y": [], "addr": ""}  # 绘图数据
        self.start_plot_time = None  # 绘图起始时间
        
        # 功能码配置
        self.func_codes = {
            "01": {"name": "读线圈寄存器", "type": "read", "max": 2000, "base": 1, "prefix": "000"},
            "02": {"name": "读离散输入寄存器", "type": "read", "max": 2000, "base": 1, "prefix": "000"},
            "03": {"name": "读保持寄存器", "type": "read", "max": 125, "base": 40001, "prefix": ""},
            "04": {"name": "读输入寄存器", "type": "read", "max": 125, "base": 30001, "prefix": ""},
            "05": {"name": "写单个线圈", "type": "write_single", "max": 1, "base": 1, "prefix": "000"},
            "06": {"name": "写单个保持寄存器", "type": "write_single", "max": 1, "base": 40001, "prefix": ""},
            "07": {"name": "读取异常状态", "type": "read", "max": 1, "base": 0, "prefix": ""},
            "08": {"name": "诊断", "type": "diagnostic", "max": 1, "base": 0, "prefix": ""},
            "11": {"name": "获取通信事件计数器", "type": "read", "max": 1, "base": 0, "prefix": ""},
            "15": {"name": "写多个线圈", "type": "write_multi", "max": 1968, "base": 1, "prefix": "000"},
            "16": {"name": "写多个保持寄存器", "type": "write_multi", "max": 123, "base": 40001, "prefix": ""}
        }
        self.read_funcs = ["01", "02", "03", "04", "07", "11"]
        
        # 参数变量
        self.slave_addr = tk.StringVar(value="1")
        self.func_code = tk.StringVar(value="01")
        self.start_reg = tk.StringVar(value="0")
        self.count = tk.StringVar(value="1")
        self.write_val = tk.StringVar(value="1")
        self.write_vals = tk.StringVar(value="1,0,1")
        self.read_interval = tk.StringVar(value="0.5")
        self.batch_params = tk.StringVar(value="01:0,10;02:0,10;03:0,10;04:0,10;07:0,1;11:0,1")
        self.batch_interval = tk.StringVar(value="0.2")
        self.batch_all = tk.BooleanVar(value=True)
        
        # 扫描参数组合
        self.scan_combs = [
            ("9600", "1", "无校验", "8"), ("4800", "1", "无校验", "8"), ("19200", "1", "无校验", "8"),
            ("9600", "1", "奇校验", "8"), ("9600", "1", "偶校验", "8"), ("38400", "1", "无校验", "8"),
            ("57600", "1", "无校验", "8"), ("115200", "1", "无校验", "8"), ("230400", "1", "无校验", "8"),
            ("2400", "1", "无校验", "8"), ("4800", "1", "奇校验", "8"), ("4800", "1", "偶校验", "8"),
            ("19200", "1", "奇校验", "8"), ("19200", "1", "偶校验", "8"), ("38400", "1", "奇校验", "8"),
            ("38400", "1", "偶校验", "8"), ("57600", "1", "奇校验", "8"), ("57600", "1", "偶校验", "8"),
            ("115200", "1", "奇校验", "8"), ("115200", "1", "偶校验", "8"), ("230400", "1", "奇校验", "8"),
            ("230400", "1", "偶校验", "8"), ("2400", "1", "奇校验", "8"), ("2400", "1", "偶校验", "8"),
            ("9600", "2", "无校验", "8"), ("4800", "2", "无校验", "8"), ("19200", "2", "无校验", "8"),
            ("9600", "2", "奇校验", "8"), ("9600", "2", "偶校验", "8"), ("38400", "2", "无校验", "8"),
            ("57600", "2", "无校验", "8"), ("115200", "2", "无校验", "8"), ("230400", "2", "无校验", "8"),
            ("2400", "2", "无校验", "8"), ("4800", "2", "奇校验", "8"), ("4800", "2", "偶校验", "8"),
            ("19200", "2", "奇校验", "8"), ("19200", "2", "偶校验", "8"), ("38400", "2", "奇校验", "8"),
            ("38400", "2", "偶校验", "8"), ("57600", "2", "奇校验", "8"), ("57600", "2", "偶校验", "8"),
            ("115200", "2", "奇校验", "8"), ("115200", "2", "偶校验", "8"), ("230400", "2", "奇校验", "8"),
            ("230400", "2", "偶校验", "8"), ("2400", "2", "奇校验", "8"), ("2400", "2", "偶校验", "8"),
        ]
        
        # 加载配置
        self._load_all_config()
        
        # 构建UI
        self._setup_style()
        self._build_ui()
        self._bind_events()
        
        # 启动数据更新
        self.root.after(50, self._update_display)

    def _setup_style(self):
        """样式配置"""
        style = ttk.Style(self.root)
        style.theme_use("clam")
        
        # 核心样式
        style.configure("Title.TLabel", font=("微软雅黑", 12, "bold"), foreground="#1E90FF", background="#f0f8ff")
        style.configure("Frame.TLabelframe", font=("微软雅黑", 10, "bold"), foreground="#2F4F4F")
        
        # 按钮样式
        btn_styles = {
            "Detect.TButton": {"bg": "#4682B4"}, "Open.TButton": {"bg": "#1E90FF"},
            "Scan.TButton": {"bg": "#32CD32"}, "Start.TButton": {"bg": "#20B2AA"},
            "Stop.TButton": {"bg": "#DC143C"}, "Clear.TButton": {"bg": "#FF8C00"},
            "Save.TButton": {"bg": "#9370DB"}, "Operate.TButton": {"bg": "#9932CC"},
            "Batch.TButton": {"bg": "#008B8B"}, "Excel.TButton": {"bg": "#228B22"},
            "Alias.TButton": {"bg": "#FF6347"}, "Alarm.TButton": {"bg": "#FF4500"}
        }
        for name, cfg in btn_styles.items():
            style.configure(name, font=("微软雅黑", 9), foreground="white", background=cfg["bg"], padding=3)

    def _build_ui(self):
        """构建UI"""
        # 主标签页
        main_notebook = ttk.Notebook(self.root)
        main_notebook.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
        
        # 1. 主功能标签页
        main_frame = ttk.Frame(main_notebook)
        main_notebook.add(main_frame, text="主功能")
        
        # 标题
        ttk.Label(main_frame, text="Modbus-RTU 通信工具 V3.1", style="Title.TLabel").pack(fill=tk.X, pady=(0, 10))
        
        # 串口配置
        port_frame = ttk.LabelFrame(main_frame, text="串口配置", style="Frame.TLabelframe")
        port_frame.pack(fill=tk.X, pady=(0, 10))
        
        # 串口选择
        ttk.Label(port_frame, text="串口:").grid(row=0, column=0, padx=5, pady=5, sticky="e")
        self.port_combo = ttk.Combobox(port_frame, state="readonly", width=15)
        self.port_combo.grid(row=0, column=1, padx=5, pady=5)
        ttk.Button(port_frame, text="检测串口", command=self._detect_ports, style="Detect.TButton").grid(row=0, column=1, padx=(180,5), pady=5, sticky="w")
        
        # 串口参数
        params = [
            ("波特率:", ["110","300","600","1200","2400","4800","9600","19200","38400","57600","115200"], 0, 2, 3, "9600"),
            ("校验位:", ["无校验","奇校验","偶校验"], 0, 4, 5, "无校验"),
            ("停止位:", ["1","2"], 1, 2, 3, "1"),
            ("数据位:", ["8"], 1, 4, 5, "8")
        ]
        self.param_combos = {}
        for lbl, vals, r, c1, c2, default in params:
            ttk.Label(port_frame, text=lbl).grid(row=r, column=c1, padx=5, pady=5, sticky="e")
            combo = ttk.Combobox(port_frame, values=vals, state="readonly", width=10)
            combo.set(default)
            combo.grid(row=r, column=c2, padx=5, pady=5)
            self.param_combos[lbl[:-1]] = combo
        
        # 多从站配置
        ttk.Label(port_frame, text="多从站地址:").grid(row=2, column=0, padx=5, pady=5, sticky="e")
        ttk.Entry(port_frame, textvariable=self.slave_list, width=20).grid(row=2, column=1, padx=5, pady=5)
        ttk.Label(port_frame, text="(逗号分隔,如1,2,3)", font=("微软雅黑", 8), foreground="#696969").grid(row=2, column=2, padx=5, pady=5, sticky="w")
        
        # 控制按钮
        ctrl_frame = ttk.Frame(main_frame)
        ctrl_frame.pack(fill=tk.X, pady=(0, 10))
        
        self.buttons = {
            "open": ttk.Button(ctrl_frame, text="打开串口", command=self._open_serial, style="Open.TButton"),
            "scan": ttk.Button(ctrl_frame, text="扫描参数", command=self._start_scan, style="Scan.TButton"),
            "sniff": ttk.Button(ctrl_frame, text="开始抓包", command=self._start_sniff, style="Start.TButton", state=tk.DISABLED),
            "operate": ttk.Button(ctrl_frame, text="执行操作", command=self._start_operate, style="Operate.TButton", state=tk.DISABLED),
            "batch": ttk.Button(ctrl_frame, text="批量读取", command=self._start_batch, style="Batch.TButton", state=tk.DISABLED),
            "stop": ttk.Button(ctrl_frame, text="停止操作", command=self._stop_all, style="Stop.TButton", state=tk.DISABLED),
            "clear": ttk.Button(ctrl_frame, text="清空日志", command=self._clear_data, style="Clear.TButton"),
            "save": ttk.Button(ctrl_frame, text="保存日志(TXT)", command=self._save_data, style="Save.TButton", state=tk.DISABLED),
            "excel": ttk.Button(ctrl_frame, text="导出Excel", command=self._save_excel, style="Excel.TButton", state=tk.DISABLED),
            "alias": ttk.Button(ctrl_frame, text="寄存器别名", command=self._edit_aliases, style="Alias.TButton"),
            "alarm": ttk.Button(ctrl_frame, text="报警配置", command=self._edit_alarm, style="Alarm.TButton"),
        }
        for btn in self.buttons.values():
            btn.pack(side=tk.LEFT, padx=5)
        
        # 操作参数
        self._build_operate_params(main_frame)
        
        # 批量读取参数
        self._build_batch_params(main_frame)
        
        # 日志显示
        log_frame = ttk.LabelFrame(main_frame, text="通信日志", style="Frame.TLabelframe")
        log_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 10))
        self.log_text = scrolledtext.ScrolledText(log_frame, font=("Consolas", 10), wrap=tk.NONE)
        self.log_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
        self.log_text.config(state=tk.DISABLED)
        
        # 日志样式
        styles = {
            "modbus": ("#006400", "bold"), "info": ("#4169E1", ""), "error": ("#DC143C", "bold"),
            "warn": ("#FF8C00", "bold"), "reg_value": ("#0000CD", "bold"), "reg_pair": ("#8B008B", "bold"),
            "alarm": ("#FF0000", "bold")
        }
        for name, (color, weight) in styles.items():
            self.log_text.tag_configure(name, foreground=color, font=("Consolas", 10, weight))
        
        # 2. 数据可视化标签页
        plot_frame = ttk.Frame(main_notebook)
        main_notebook.add(plot_frame, text="数据可视化")
        
        # 可视化配置
        plot_ctrl_frame = ttk.Frame(plot_frame)
        plot_ctrl_frame.pack(fill=tk.X, pady=10, padx=10)
        
        ttk.Label(plot_ctrl_frame, text="监控寄存器:").pack(side=tk.LEFT, padx=(0,5))
        self.plot_reg = ttk.Entry(plot_ctrl_frame, width=15)
        self.plot_reg.pack(side=tk.LEFT, padx=(0,15))
        self.plot_reg.insert(0, "40001")
        
        ttk.Button(plot_ctrl_frame, text="开始监控", command=self._start_plot, style="Start.TButton").pack(side=tk.LEFT, padx=(0,15))
        ttk.Button(plot_ctrl_frame, text="清空曲线", command=self._clear_plot, style="Clear.TButton").pack(side=tk.LEFT)
        
        # 绘图区域
        self.fig, self.ax = plt.subplots(figsize=(12, 6))
        self.ax.set_title("寄存器数值实时曲线", fontsize=12)
        self.ax.set_xlabel("时间(秒)", fontsize=10)
        self.ax.set_ylabel("数值", fontsize=10)
        self.ax.grid(True, alpha=0.3)
        self.line, = self.ax.plot([], [], 'b-', linewidth=2, label="实时数值")
        self.ax.legend()
        
        self.canvas = FigureCanvasTkAgg(self.fig, master=plot_frame)
        self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
        
        # 状态栏
        ttk.Label(self.root, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W).pack(fill=tk.X, side=tk.BOTTOM)

    def _build_operate_params(self, parent):
        """构建操作参数布局"""
        op_frame = ttk.LabelFrame(parent, text="读写操作参数", style="Frame.TLabelframe")
        op_frame.pack(fill=tk.X, pady=(0, 10))
        
        row1 = ttk.Frame(op_frame)
        row1.pack(fill=tk.X, padx=10, pady=5)
        
        params = [
            ("功能码:", self.func_code, list(self.func_codes.keys()), 8),
            ("从站地址:", self.slave_addr, None, 8),
            ("起始地址:", self.start_reg, None, 10),
            ("数量:", self.count, None, 8)
        ]
        self.func_desc = ttk.Label(row1, font=("微软雅黑", 8), foreground="#696969")
        self.addr_desc = ttk.Label(row1, font=("微软雅黑", 8), foreground="#696969")
        self.count_desc = ttk.Label(row1, font=("微软雅黑", 8), foreground="#696969")
        
        desc_labels = [self.func_desc, self.addr_desc, self.count_desc]
        desc_texts = ["(读线圈寄存器)", "(原始地址0→00001)", "(最多2000)"]
        
        for i, (lbl, var, vals, width) in enumerate(params):
            ttk.Label(row1, text=lbl).pack(side=tk.LEFT, padx=(0,5))
            if vals:
                ttk.Combobox(row1, textvariable=var, values=vals, state="readonly", width=width).pack(side=tk.LEFT, padx=(0,15))
            else:
                ttk.Entry(row1, textvariable=var, width=width).pack(side=tk.LEFT, padx=(0,15))
            
            if i < len(desc_labels):
                desc_labels[i].config(text=desc_texts[i])
                desc_labels[i].pack(side=tk.LEFT, padx=(0,20))
        
        self.dyn_frame = ttk.Frame(op_frame)
        self.dyn_frame.pack(fill=tk.X, padx=10, pady=5)
        
        self.interval_frame = ttk.Frame(self.dyn_frame)
        ttk.Label(self.interval_frame, text="读取间隔:").pack(side=tk.LEFT, padx=(0,5))
        ttk.Entry(self.interval_frame, textvariable=self.read_interval, width=8).pack(side=tk.LEFT, padx=(0,15))
        ttk.Label(self.interval_frame, text="秒(≥0.1)", font=("微软雅黑", 8), foreground="#696969").pack(side=tk.LEFT)
        
        self.single_frame = ttk.Frame(self.dyn_frame)
        ttk.Label(self.single_frame, text="写入值:").pack(side=tk.LEFT, padx=(0,5))
        self.single_hint = ttk.Label(self.single_frame, font=("微软雅黑", 8), foreground="#696969")
        ttk.Entry(self.single_frame, textvariable=self.write_val, width=15).pack(side=tk.LEFT, padx=(0,15))
        self.single_hint.pack(side=tk.LEFT)
        
        self.multi_frame = ttk.Frame(self.dyn_frame)
        ttk.Label(self.multi_frame, text="写入值列表:").pack(side=tk.LEFT, padx=(0,5))
        ttk.Entry(self.multi_frame, textvariable=self.write_vals, width=30).pack(side=tk.LEFT, padx=(0,15))
        ttk.Label(self.multi_frame, text="(逗号分隔)", font=("微软雅黑", 8), foreground="#696969").pack(side=tk.LEFT)

    def _build_batch_params(self, parent):
        """构建批量参数布局"""
        batch_frame = ttk.LabelFrame(parent, text="批量读取参数", style="Frame.TLabelframe")
        batch_frame.pack(fill=tk.X, pady=(0, 10))
        
        row1 = ttk.Frame(batch_frame)
        row1.pack(fill=tk.X, padx=10, pady=5)
        ttk.Label(row1, text="读取参数:").pack(side=tk.LEFT, padx=(0,5))
        ttk.Entry(row1, textvariable=self.batch_params, width=60).pack(side=tk.LEFT, padx=(0,15))
        ttk.Label(row1, text="格式:功能码:起始地址,数量;...", font=("微软雅黑", 8), foreground="#696969").pack(side=tk.LEFT)
        
        row2 = ttk.Frame(batch_frame)
        row2.pack(fill=tk.X, padx=10, pady=5)
        ttk.Label(row2, text="读取间隔:").pack(side=tk.LEFT, padx=(0,5))
        ttk.Entry(row2, textvariable=self.batch_interval, width=8).pack(side=tk.LEFT, padx=(0,15))
        ttk.Label(row2, text="秒(≥0.1)", font=("微软雅黑", 8), foreground="#696969").pack(side=tk.LEFT, padx=(0,20))
        ttk.Checkbutton(row2, text="自动读取所有读功能码", variable=self.batch_all).pack(side=tk.LEFT, padx=(20,0))

    def _bind_events(self):
        """绑定事件"""
        self.func_code.trace_add('write', self._on_func_change)
        self.root.protocol("WM_DELETE_WINDOW", self._on_close)
        
        # 快捷键
        self.root.bind("<F5>", lambda e: self._open_serial())
        self.root.bind("<F6>", lambda e: self._start_sniff())
        self.root.bind("<F7>", lambda e: self._stop_all())
        self.root.bind("<Control-s>", lambda e: self._save_data())
        self.root.bind("<Control-e>", lambda e: self._save_excel())
        
        self._on_func_change()

    def _load_all_config(self):
        """加载所有配置"""
        # 加载别名
        if os.path.exists("reg_aliases.json"):
            try:
                with open("reg_aliases.json", "r", encoding="utf-8") as f:
                    self.reg_aliases = json.load(f)
            except:
                self.reg_aliases = {}
        # 加载报警
        if os.path.exists("alarm_config.json"):
            try:
                with open("alarm_config.json", "r", encoding="utf-8") as f:
                    self.alarm_config = json.load(f)
            except:
                self.alarm_config = {}
        # 加载历史配置
        if os.path.exists("modbus_config.json"):
            try:
                with open("modbus_config.json", "r", encoding="utf-8") as f:
                    config = json.load(f)
                    if "baudrate" in config:
                        self.param_combos["波特率"].set(config["baudrate"])
                    if "slave_addr" in config:
                        self.slave_addr.set(config["slave_addr"])
                    if "last_func" in config:
                        self.func_code.set(config["last_func"])
                    if "slave_list" in config:
                        self.slave_list.set(config["slave_list"])
            except:
                pass

    def _save_all_config(self):
        """保存所有配置"""
        # 保存历史配置
        config = {
            "baudrate": self.param_combos["波特率"].get(),
            "slave_addr": self.slave_addr.get(),
            "last_func": self.func_code.get(),
            "slave_list": self.slave_list.get()
        }
        with open("modbus_config.json", "w", encoding="utf-8") as f:
            json.dump(config, f, ensure_ascii=False, indent=2)
        # 保存别名
        with open("reg_aliases.json", "w", encoding="utf-8") as f:
            json.dump(self.reg_aliases, f, ensure_ascii=False, indent=2)
        # 保存报警
        with open("alarm_config.json", "w", encoding="utf-8") as f:
            json.dump(self.alarm_config, f, ensure_ascii=False, indent=2)

    # ========== 核心功能实现 ==========
    def _detect_ports(self):
        """检测串口"""
        try:
            ports = serial.tools.list_ports.comports()
            if not ports:
                messagebox.showwarning("提示", "未检测到串口!")
                return
            self.port_combo['values'] = [f"{p.name} - {p.description[:20]}" for p in ports]
            self.port_combo.current(0)
            self.status_var.set(f"就绪:检测到{len(ports)}个串口 | 支持参数自动扫描")
        except Exception as e:
            messagebox.showerror("错误", f"检测串口失败:{str(e)}")

    def _get_serial_params(self):
        """获取串口参数"""
        parity_map = {"无校验":serial.PARITY_NONE, "奇校验":serial.PARITY_ODD, "偶校验":serial.PARITY_EVEN}
        stop_map = {"1":serial.STOPBITS_ONE, "2":serial.STOPBITS_TWO}
        
        return {
            "baudrate": int(self.param_combos["波特率"].get()),
            "bytesize": serial.EIGHTBITS,
            "stopbits": stop_map[self.param_combos["停止位"].get()],
            "parity": parity_map[self.param_combos["校验位"].get()],
            "timeout": 0.1,
            "write_timeout": 0.5
        }

    def _open_serial(self):
        """打开串口"""
        if not self.port_combo.get():
            messagebox.showwarning("提示", "请先检测并选择串口!")
            return
        
        params = f"{self.param_combos['波特率'].get()}/{self.param_combos['校验位'].get()}/{self.param_combos['停止位'].get()}/{self.param_combos['数据位'].get()}"
        if not messagebox.askyesno("参数确认", f"{params}\n是否继续打开?"):
            return
        
        self._close_serial()
        
        try:
            port = self.port_combo.get().split(" - ")[0]
            self.ser = serial.Serial(port=port, **self._get_serial_params())
            
            if self.ser.is_open:
                self._set_btn_state(tk.DISABLED)
                for btn in ["sniff", "operate", "batch", "stop"]:
                    self.buttons[btn].config(state=tk.NORMAL)
                self.buttons["excel"].config(state=tk.NORMAL)
                self.status_var.set(f"就绪:串口已打开 | 参数:{params}")
                # 标准化日志格式
                self._log("info", f"【成功】打开串口:{port} | 参数:{params}\n")
        except Exception as e:
            messagebox.showerror("错误", f"串口打开失败:{str(e)}")
            self._close_serial()

    def _start_scan(self):
        """开始参数扫描"""
        if not self.port_combo.get() or self.is_running:
            return
        
        self.is_running = True
        self._clear_data()
        self._set_btn_state(tk.DISABLED)
        self.buttons["stop"].config(state=tk.NORMAL)
        self.status_var.set("正在扫描:自动匹配合法03帧参数 | 包含波特率/校验位/停止位/数据位")
        # 标准化日志格式
        self._log("info", "【扫描开始】优先测试停止位=1的参数组合 | 精准显示寄存器数据\n")
        
        threading.Thread(target=self._scan_func, daemon=True).start()

    def _scan_func(self):
        """参数扫描逻辑"""
        port = self.port_combo.get().split(" - ")[0]
        found = False
        
        for idx, (baud, stop, parity, data) in enumerate(self.scan_combs):
            if not self.is_running:
                break
            
            param_str = f"{baud}/{parity}/{stop}/{data}"
            # 标准化日志格式
            self._log("info", f"【扫描{idx+1}/{len(self.scan_combs)}】{param_str}\n")
            
            try:
                self._close_serial()
                self.ser = serial.Serial(port=port, baudrate=int(baud), parity=({"无校验":"N","奇校验":"O","偶校验":"E"})[parity],
                                       stopbits=int(stop), bytesize=int(data), timeout=0.1)
                
                if self.ser.is_open:
                    start = time.time()
                    while time.time() - start < 1.5 and self.is_running:
                        if self.ser.in_waiting > 0:
                            data = self.ser.read(self.ser.in_waiting)
                            if len(data) >= 8 and data[1] == 0x03:
                                crc = self._crc16(data[:-2])
                                if data[-2:] == crc:
                                    found = True
                                    break
                        time.sleep(0.1)
                    
                    if found:
                        # 标准化日志格式
                        self._log("modbus", f"【找到有效参数】{param_str} | 捕获合法03帧!\n")
                        self.root.after(0, lambda: self.param_combos["波特率"].set(baud))
                        self.root.after(0, lambda: self.param_combos["停止位"].set(stop))
                        self.root.after(0, lambda: self.param_combos["校验位"].set(parity))
                        self.root.after(0, lambda: messagebox.showinfo("扫描成功", f"找到有效参数:\n{param_str}"))
                        break
            except:
                continue
            finally:
                self._close_serial()
        
        self.is_running = False
        self.root.after(0, self._stop_all)
        if not found:
            self._log("warn", "【扫描完成】未找到合法参数!\n")
            self.root.after(0, lambda: messagebox.showinfo("提示", "扫描完成:未找到合法参数"))

    def _start_sniff(self):
        """开始抓包"""
        if not self.ser or not self.ser.is_open:
            return
        
        self.is_running = True
        self._set_btn_state(tk.DISABLED)
        self.buttons["stop"].config(state=tk.NORMAL)
        param_str = f"{self.param_combos['波特率'].get()}/{self.param_combos['校验位'].get()}/{self.param_combos['停止位'].get()}/{self.param_combos['数据位'].get()}"
        self.status_var.set(f"正在抓包 | 参数:{param_str}")
        # 标准化日志格式
        self._log("modbus", f"【抓包开始】精准解析03功能码响应帧,显示每个寄存器的地址和具体数值\n")
        self._log("info", f"当前参数:{param_str}\n")
        
        threading.Thread(target=self._sniff_func, daemon=True).start()

    def _sniff_func(self):
        """抓包逻辑"""
        buffer = bytearray()
        
        while self.is_running and self.ser and self.ser.is_open:
            try:
                if self.ser.in_waiting > 0:
                    buffer.extend(self.ser.read(self.ser.in_waiting))
                
                # 支持解析 03/07/11 功能码
                while len(buffer) >= 5:
                    func_code = buffer[1]
                    # 03功能码帧长≥8,07/11功能码帧长≥5
                    min_len = 8 if func_code == 0x03 else 5
                    if len(buffer) < min_len:
                        break
                    
                    if func_code not in [0x03, 0x07, 0x0B]:  # 0x0B=11
                        del buffer[0]
                        continue
                    
                    frame = buffer[:min_len]
                    crc = self._crc16(frame[:-2])
                    if frame[-2:] != crc:
                        del buffer[0]
                        continue
                    
                    slave = frame[0]
                    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
                    hex_str = " ".join([f"{b:02X}" for b in frame])
                    
                    if func_code == 0x03:
                        start = (frame[2] << 8) | frame[3]
                        count = (frame[4] << 8) | frame[5]
                        std_start = self._get_std_addr("03", start)
                        std_end = self._get_std_addr("03", start + count - 1)
                        # 标准化日志格式
                        self._log("modbus", f"[{now}] 合法帧 | 请求帧 | 从站{slave} | 功能码03 | 起始地址{start}(规范地址{std_start}) | 寄存器数量{count} | 原始数据:{hex_str}\n")
                        self._log("info", f"【寄存器地址范围】{std_start} ~ {std_end} (共{count}个寄存器)\n")
                        
                        del buffer[:8]
                        resp_timeout = time.time() + 0.5
                        resp_captured = False
                        while time.time() < resp_timeout and self.is_running:
                            if self.ser.in_waiting > 0:
                                resp = self.ser.read(self.ser.in_waiting)
                                if len(resp) >= 5 and resp[0] == slave and resp[1] == 0x03:
                                    reg_data = self._parse_reg_data("03", start, resp, count)
                                    resp_hex = " ".join([f"{b:02X}" for b in resp])
                                    self._log("reg_value", f"[{now}] 响应帧:{resp_hex}\n")
                                    self._display_reg_data(reg_data, "03")
                                    resp_captured = True
                                    break
                            time.sleep(0.01)
                        if not resp_captured:
                            self._log("warn", f"【超时】未捕获到从站{slave}的响应帧\n")
                    elif func_code == 0x07:
                        self._log("modbus", f"[{now}] 合法帧 | 请求帧 | 从站{slave} | 功能码07 | 读取异常状态 | 原始数据:{hex_str}\n")
                        del buffer[:5]
                    elif func_code == 0x0B:
                        self._log("modbus", f"[{now}] 合法帧 | 请求帧 | 从站{slave} | 功能码11 | 获取通信事件计数器 | 原始数据:{hex_str}\n")
                        del buffer[:5]
            except:
                continue
            time.sleep(0.001)

    def _start_operate(self):
        """执行读写操作"""
        if not self.ser or not self.ser.is_open:
            return
        
        try:
            slave = int(self.slave_addr.get())
            start = int(self.start_reg.get())
            count = int(self.count.get())
            func = self.func_code.get()
            
            if slave < 1 or slave > 247 or count < 1 or count > self.func_codes[func]["max"]:
                messagebox.showwarning("提示", "参数超出合法范围!")
                return
            
            if self.func_codes[func]["type"] in ["read", "diagnostic"] and float(self.read_interval.get()) < 0.1:
                messagebox.showwarning("提示", "读取间隔不能小于0.1秒!")
                return
        except:
            messagebox.showwarning("提示", "请输入有效的数值参数!")
            return
        
        self.is_running = True
        self._set_btn_state(tk.DISABLED)
        self.buttons["stop"].config(state=tk.NORMAL)
        self.status_var.set(f"正在执行{self.func_codes[func]['name']}操作")
        
        threading.Thread(target=self._operate_func, daemon=True).start()

    def _operate_func(self):
        """操作执行逻辑"""
        # 多从站处理
        slave_list = [s.strip() for s in self.slave_list.get().split(",") if s.strip()]
        if not slave_list:
            slave_list = [self.slave_addr.get()]
        
        while self.is_running and self.ser and self.ser.is_open:
            try:
                for slave_str in slave_list:
                    if not self.is_running:
                        break
                    
                    slave = int(slave_str)
                    func = self.func_code.get()
                    start = int(self.start_reg.get())
                    count = int(self.count.get())
                    
                    frame = bytearray([slave, int(func)])
                    # 07/08/11 功能码帧构造
                    if func in ["07", "11"]:
                        # 07/11功能码无需地址和数量参数
                        pass
                    elif func == "08":
                        # 08诊断功能码:默认子功能码0000
                        frame.extend([0x00, 0x00])
                    elif self.func_codes[func]["type"] == "read":
                        frame.extend([(start >> 8) & 0xFF, start & 0xFF, (count >> 8) & 0xFF, count & 0xFF])
                    elif self.func_codes[func]["type"] == "write_single":
                        if func == "05":
                            val = int(self.write_val.get())
                            frame.extend([(start >> 8) & 0xFF, start & 0xFF, 0xFF if val else 0x00, 0x00])
                        else:
                            val = int(self.write_val.get())
                            frame.extend([(start >> 8) & 0xFF, start & 0xFF, (val >> 8) & 0xFF, val & 0xFF])
                    elif self.func_codes[func]["type"] == "write_multi":
                        vals = [int(v) for v in self.write_vals.get().split(",")]
                        byte_count = len(vals) * 2 if func == "16" else (len(vals) + 7) // 8
                        frame.extend([(start >> 8) & 0xFF, start & 0xFF, (count >> 8) & 0xFF, count & 0xFF, byte_count])
                        if func == "15":
                            data = bytearray()
                            for i in range(len(vals)):
                                if i % 8 == 0:
                                    data.append(0)
                                if vals[i]:
                                    data[-1] |= (1 << (i % 8))
                            frame.extend(data)
                        else:
                            for val in vals:
                                frame.extend([(val >> 8) & 0xFF, val & 0xFF])
                    
                    frame.extend(self._crc16(frame))
                    self.ser.write(frame)
                    
                    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
                    hex_str = " ".join([f"{b:02X}" for b in frame])
                    std_start = self._get_std_addr(func, start)
                    std_end = self._get_std_addr(func, start + count - 1)
                    # 标准化日志格式
                    self._log("info", f"[{now}] 操作帧 | 从站{slave} | 功能码{func} | 地址范围:{std_start} ~ {std_end} | 寄存器数量{count} | 原始数据:{hex_str}\n")
                    
                    # 解析响应
                    if func in ["03", "07", "11"]:
                        time.sleep(0.1)
                        if self.ser.in_waiting > 0:
                            resp = self.ser.read(self.ser.in_waiting)
                            reg_data = self._parse_reg_data(func, start, resp, count)
                            self._display_reg_data(reg_data, func)
                    
                    # 操作完成日志
                    self._log("info", f"【操作完成】{self.func_codes[func]['name']}执行完毕\n")
                    time.sleep(float(self.read_interval.get())/len(slave_list))
            except:
                break
        
        self._stop_all()

    def _start_batch(self):
        """开始批量读取"""
        if not self.ser or not self.ser.is_open:
            return
        
        try:
            if self.batch_all.get():
                batch_list = [(f, 0, 10) if f in ["01","02","03","04"] else (f, 0, 1) for f in self.read_funcs]
            else:
                batch_list = []
                for item in self.batch_params.get().split(";"):
                    if not item:
                        continue
                    func, addr_count = item.split(":")
                    start, count = addr_count.split(",")
                    batch_list.append((func.strip(), int(start), int(count)))
            
            if not batch_list or float(self.batch_interval.get()) < 0.1:
                raise ValueError("参数不合法")
        except:
            messagebox.showwarning("提示", "批量读取参数格式错误!")
            return
        
        self.is_running = True
        self._set_btn_state(tk.DISABLED)
        self.buttons["stop"].config(state=tk.NORMAL)
        self.status_var.set(f"正在批量读取 | 共{len(batch_list)}条数据")
        
        # 标准化日志格式
        func_codes_str = "/".join(self.read_funcs[:4])  # 只显示前4个主要功能码
        self._log("info", f"【批量读取开始】自动读取所有读功能码({func_codes_str}) | 读取间隔{self.batch_interval.get()}秒\n\n")
        
        threading.Thread(target=self._batch_func, args=(batch_list,), daemon=True).start()

    def _batch_func(self, batch_list):
        """批量读取逻辑"""
        interval = float(self.batch_interval.get())
        # 多从站处理
        slave_list = [s.strip() for s in self.slave_list.get().split(",") if s.strip()]
        if not slave_list:
            slave_list = [self.slave_addr.get()]
        
        for slave_str in slave_list:
            if not self.is_running:
                break
            
            slave = int(slave_str)
            self._log("info", f"【批量读取从站{slave}】开始读取数据\n")
            
            for idx, (func, start, count) in enumerate(batch_list):
                if not self.is_running:
                    break
                
                try:
                    frame = bytearray([slave, int(func)])
                    # 07/11 功能码帧构造
                    if func in ["07", "11"]:
                        pass
                    else:
                        frame.extend([(start >> 8) & 0xFF, start & 0xFF, (count >> 8) & 0xFF, count & 0xFF])
                    frame.extend(self._crc16(frame))
                    self.ser.write(frame)
                    
                    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
                    hex_str = " ".join([f"{b:02X}" for b in frame])
                    
                    # 标准化日志格式
                    self._log("info", f"【批量读取 {idx+1}/{len(batch_list)}】{now} | {func}({self.func_codes[func]['name']}) | 起始地址{start} | 数量{count}\n")
                    self._log("info", f"发送帧:{hex_str}\n")
                    
                    time.sleep(0.1)
                    if self.ser.in_waiting > 0:
                        resp = self.ser.read(self.ser.in_waiting)
                        resp_hex = " ".join([f"{b:02X}" for b in resp])
                        self._log("info", f"响应帧:{resp_hex}\n")
                        reg_data = self._parse_reg_data(func, start, resp, count)
                        self._display_reg_data(reg_data, func)
                    
                    if idx < len(batch_list) - 1:
                        time.sleep(interval)
                except Exception as e:
                    self._log("error", f"【批量读取错误】从站{slave} {func}功能码读取失败:{str(e)}\n")
        
        # 标准化日志格式
        self._log("info", "\n【批量读取完成】所有功能码数据读取完毕!\n")
        self._stop_all()

    # ========== 工具方法 ==========
    def _crc16(self, data):
        """CRC16计算"""
        crc = 0xFFFF
        for byte in data:
            crc ^= byte
            for _ in range(8):
                crc = (crc >> 1) ^ 0xA001 if crc & 0x0001 else crc >> 1
        return bytes([crc & 0xFF, (crc >> 8) & 0xFF])

    def _get_std_addr(self, func, raw_addr):
        """标准地址转换"""
        cfg = self.func_codes[func]
        std_addr = cfg["base"] + raw_addr
        return f"{cfg['prefix']}{std_addr}" if cfg['prefix'] else f"{std_addr}"

    def _parse_reg_data(self, func, start, frame, count):
        """解析寄存器数据"""
        reg_data = []
        try:
            func_int = int(func)
            now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
            raw_hex = " ".join([f"{b:02X}" for b in frame])
            
            # 3/4功能码解析
            if func_int in [3, 4]:
                data = frame[3:-2] if len(frame) > 5 else frame
                for i in range(min(count, len(data)//2)):
                    addr = self._get_std_addr(func, start + i)
                    val = (data[i*2] << 8) | data[i*2+1]
                    reg_data.append((addr, val))
                    
                    # 结构化数据
                    self.structured_data.append({
                        "time": now,
                        "slave": frame[0] if len(frame) > 0 else "",
                        "func": func,
                        "addr": addr,
                        "value": val,
                        "raw_hex": raw_hex
                    })
                    
                    # 更新绘图
                    if self.start_plot_time and addr == self.plot_data["addr"]:
                        self.plot_data["x"].append(time.time() - self.start_plot_time)
                        self.plot_data["y"].append(val)
                    
                    # 检查报警
                    self._check_alarm(addr, val)
            
            # 1/2功能码解析
            elif func_int in [1, 2]:
                data = frame[1:] if len(frame) > 1 else frame
                for i in range(min(count, len(data)*8)):
                    addr = self._get_std_addr(func, start + i)
                    val = (data[i//8] >> (i%8)) & 0x01
                    reg_data.append((addr, val))
                    
                    # 结构化数据
                    self.structured_data.append({
                        "time": now,
                        "slave": frame[0] if len(frame) > 0 else "",
                        "func": func,
                        "addr": addr,
                        "value": val,
                        "raw_hex": raw_hex
                    })
                    
                    # 检查报警
                    self._check_alarm(addr, val)

            # 07功能码解析(读取异常状态)
            elif func_int == 7:
                if len(frame) >= 5:
                    status = frame[2]
                    status_desc = self._get_exception_status_desc(status)
                    reg_data.append(("异常状态码", status))
                    reg_data.append(("状态描述", status_desc))
                    
                    self.structured_data.append({
                        "time": now,
                        "slave": frame[0],
                        "func": func,
                        "addr": "异常状态",
                        "value": f"{status} ({status_desc})",
                        "raw_hex": raw_hex
                    })

            # 11功能码解析(获取通信事件计数器)
            elif func_int == 11:
                if len(frame) >= 7:
                    event_count = (frame[2] << 8) | frame[3]
                    event_status = (frame[4] << 8) | frame[5]
                    reg_data.append(("通信事件计数", event_count))
                    reg_data.append(("通信事件状态", event_status))
                    
                    self.structured_data.append({
                        "time": now,
                        "slave": frame[0],
                        "func": func,
                        "addr": "通信事件计数",
                        "value": event_count,
                        "raw_hex": raw_hex
                    })
                    self.structured_data.append({
                        "time": now,
                        "slave": frame[0],
                        "func": func,
                        "addr": "通信事件状态",
                        "value": event_status,
                        "raw_hex": raw_hex
                    })

            # 08功能码解析(诊断功能)
            elif func_int == 8:
                if len(frame) >= 7:
                    sub_func = (frame[2] << 8) | frame[3]
                    diag_data = (frame[4] << 8) | frame[5]
                    reg_data.append((f"诊断子功能码({sub_func})", diag_data))
                    
                    self.structured_data.append({
                        "time": now,
                        "slave": frame[0],
                        "func": func,
                        "addr": f"诊断子功能{sub_func}",
                        "value": diag_data,
                        "raw_hex": raw_hex
                    })
        except:
            pass
        return reg_data

    def _get_exception_status_desc(self, status):
        """07功能码异常状态码描述"""
        status_map = {
            0x00: "无异常",
            0x01: "奇偶校验错误",
            0x02: "帧错误",
            0x04: "溢出错误",
            0x08: "校验和错误",
            0x10: "地址错误",
            0x20: "功能码不支持",
            0x40: "寄存器不存在",
            0x80: "其他错误"
        }
        desc = []
        for code, msg in status_map.items():
            if status & code:
                desc.append(msg)
        return " | ".join(desc) if desc else "未知状态"

    def _display_reg_data(self, reg_data, func):
        """显示寄存器数据"""
        if not reg_data:
            return
        
        self._log("info", f"\n【{self.func_codes[func]['name']} - 详细数据】\n")
        
        for addr, val in reg_data:
            alias = self.reg_aliases.get(addr, addr)
            self._log("reg_pair", f"【寄存器数据】{alias}={val}\n")
        
        self._log("info", f"\n【统计】共解析 {len(reg_data)} 个寄存器数据\n")

    def _log(self, tag, content):
        """日志记录"""
        try:
            self.data_queue.put((tag, content), timeout=0.1)
            self.captured_data.append(content)
        except:
            pass

    def _update_display(self):
        """更新日志显示"""
        try:
            for _ in range(min(10, self.data_queue.qsize())):
                tag, content = self.data_queue.get_nowait()
                self.log_text.config(state=tk.NORMAL)
                self.log_text.insert(tk.END, content, tag)
                self.log_text.see(tk.END)
                self.log_text.config(state=tk.DISABLED)
        except:
            pass
        self.root.after(50, self._update_display)

    def _set_btn_state(self, state):
        """设置按钮状态"""
        for name, btn in self.buttons.items():
            if name != "stop":
                btn.config(state=state)
        self.buttons["stop"].config(state=tk.NORMAL if state == tk.DISABLED else tk.DISABLED)

    def _close_serial(self):
        """关闭串口"""
        if self.ser and self.ser.is_open:
            try:
                self.ser.close()
            except:
                pass
        self.ser = None

    def _stop_all(self):
        """停止所有操作"""
        self.is_running = False
        self._close_serial()
        self._set_btn_state(tk.NORMAL)
        
        for btn in ["sniff", "operate", "batch"]:
            self.buttons[btn].config(state=tk.DISABLED)
        
        self.buttons["save"].config(state=tk.NORMAL if self.captured_data else tk.DISABLED)
        self.buttons["excel"].config(state=tk.NORMAL if self.structured_data else tk.DISABLED)
        param_str = f"{self.param_combos['波特率'].get()}/{self.param_combos['校验位'].get()}/{self.param_combos['停止位'].get()}/{self.param_combos['数据位'].get()}"
        self.status_var.set(f"就绪:已停止 | 参数:{param_str}")

    def _clear_data(self):
        """清空数据"""
        self.log_text.config(state=tk.NORMAL)
        self.log_text.delete(1.0, tk.END)
        self.log_text.config(state=tk.DISABLED)
        self.captured_data.clear()
        self.structured_data.clear()
        self.buttons["save"].config(state=tk.DISABLED)
        self.buttons["excel"].config(state=tk.DISABLED)
        self.status_var.set("就绪:数据已清空")

    def _save_data(self):
        """保存日志"""
        if not self.captured_data:
            messagebox.showwarning("提示", "无数据可保存!")
            return
        
        path = filedialog.asksaveasfilename(defaultextension=".txt",
                                           initialfile=f"Modbus_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt",
                                           filetypes=[("文本文件", "*.txt")])
        if not path:
            return
        
        try:
            with open(path, "w", encoding="utf-8") as f:
                f.write(f"Modbus数据 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
                f.write("="*80 + "\n")
                param_str = f"{self.param_combos['波特率'].get()}/{self.param_combos['校验位'].get()}/{self.param_combos['停止位'].get()}/{self.param_combos['数据位'].get()}"
                f.write(f"串口参数:{param_str}\n")
                f.write("="*80 + "\n\n")
                f.writelines(self.captured_data)
            
            messagebox.showinfo("保存成功", f"数据已保存到:\n{path}")
        except:
            messagebox.showerror("保存失败", "数据保存出错!")

    def _on_func_change(self, *args):
        """功能码变更处理"""
        func = self.func_code.get()
        cfg = self.func_codes[func]
        
        self.func_desc.config(text=f"({cfg['name']})")
        self.addr_desc.config(text=f"(原始地址0→{cfg['prefix']}{cfg['base']})")
        self.count_desc.config(text=f"(最多{cfg['max']})")
        
        for frame in [self.interval_frame, self.single_frame, self.multi_frame]:
            frame.pack_forget()
        
        if cfg["type"] in ["read", "diagnostic"]:
            self.interval_frame.pack(side=tk.LEFT)
        elif cfg["type"] == "write_single":
            self.single_frame.pack(side=tk.LEFT)
            self.single_hint.config(text="(0=断开/1=闭合)" if func == "05" else "(16位数值)")
        elif cfg["type"] == "write_multi":
            self.multi_frame.pack(side=tk.LEFT)

    # ========== 扩展功能 ==========
    def _edit_aliases(self):
        """编辑寄存器别名"""
        alias_win = tk.Toplevel(self.root)
        alias_win.title("寄存器别名配置")
        alias_win.geometry("400x300")
        alias_win.transient(self.root)
        
        list_frame = ttk.Frame(alias_win)
        list_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
        
        ttk.Label(list_frame, text="寄存器地址").grid(row=0, column=0, padx=5, pady=5)
        ttk.Label(list_frame, text="别名").grid(row=0, column=1, padx=5, pady=5)
        
        # 现有别名显示
        rows = []
        for addr, alias in self.reg_aliases.items():
            row = len(rows) + 1
            ttk.Entry(list_frame, width=15).grid(row=row, column=0, padx=5, pady=5)
            ttk.Entry(list_frame, width=20).grid(row=row, column=1, padx=5, pady=5)
            list_frame.grid_slaves(row=row, column=0)[0].insert(0, addr)
            list_frame.grid_slaves(row=row, column=1)[0].insert(0, alias)
            rows.append(row)
        
        # 新增行
        row = len(rows) + 1
        ttk.Entry(list_frame, width=15).grid(row=row, column=0, padx=5, pady=5)
        ttk.Entry(list_frame, width=20).grid(row=row, column=1, padx=5, pady=5)
        
        # 保存按钮
        def save_aliases():
            new_aliases = {}
            for r in range(1, len(rows)+2):
                addr = list_frame.grid_slaves(row=r, column=0)[0].get().strip()
                alias = list_frame.grid_slaves(row=r, column=1)[0].get().strip()
                if addr and alias:
                    new_aliases[addr] = alias
            self.reg_aliases = new_aliases
            self._save_all_config()
            alias_win.destroy()
            messagebox.showinfo("成功", "别名配置已保存!")
        
        ttk.Button(alias_win, text="保存", command=save_aliases).pack(pady=10)

    def _edit_alarm(self):
        """编辑报警配置"""
        alarm_win = tk.Toplevel(self.root)
        alarm_win.title("报警配置")
        alarm_win.geometry("400x300")
        alarm_win.transient(self.root)
        
        ttk.Label(alarm_win, text="寄存器地址:").grid(row=0, column=0, padx=5, pady=5, sticky="e")
        alarm_addr = ttk.Entry(alarm_win, width=15)
        alarm_addr.grid(row=0, column=1, padx=5, pady=5)
        
        ttk.Label(alarm_win, text="最小值:").grid(row=1, column=0, padx=5, pady=5, sticky="e")
        alarm_min = ttk.Entry(alarm_win, width=15)
        alarm_min.grid(row=1, column=1, padx=5, pady=5)
        
        ttk.Label(alarm_win, text="最大值:").grid(row=2, column=0, padx=5, pady=5, sticky="e")
        alarm_max = ttk.Entry(alarm_win, width=15)
        alarm_max.grid(row=2, column=1, padx=5, pady=5)
        
        # 加载现有配置
        if self.alarm_config:
            addr = next(iter(self.alarm_config.keys()))
            alarm_addr.insert(0, addr)
            alarm_min.insert(0, self.alarm_config[addr]["min"])
            alarm_max.insert(0, self.alarm_config[addr]["max"])
        
        # 保存按钮
        def save_alarm():
            addr = alarm_addr.get().strip()
            try:
                min_val = float(alarm_min.get())
                max_val = float(alarm_max.get())
                if not addr:
                    raise ValueError("地址不能为空")
                self.alarm_config = {
                    addr: {"min": min_val, "max": max_val, "alarm": True}
                }
                self._save_all_config()
                alarm_win.destroy()
                messagebox.showinfo("成功", "报警配置已保存!")
            except:
                messagebox.showerror("错误", "请输入有效的数值!")
        
        ttk.Button(alarm_win, text="保存", command=save_alarm).grid(row=3, column=1, pady=10)

    def _check_alarm(self, addr, val):
        """检查报警"""
        if addr in self.alarm_config and self.alarm_config[addr]["alarm"]:
            cfg = self.alarm_config[addr]
            try:
                val = float(val)
                if val < cfg["min"] or val > cfg["max"]:
                    alarm_msg = f"【报警】{self.reg_aliases.get(addr, addr)}数值{val}超出阈值({cfg['min']}-{cfg['max']})!\n"
                    self._log("alarm", alarm_msg)
                    # 弹窗+蜂鸣
                    threading.Thread(target=lambda: messagebox.showerror("报警", f"{self.reg_aliases.get(addr, addr)}\n数值异常:{val}\n阈值:{cfg['min']}-{cfg['max']}"), daemon=True).start()
                    winsound.Beep(1000, 1000)
            except:
                pass

    def _start_plot(self):
        """开始绘图"""
        self.plot_data["addr"] = self.plot_reg.get().strip()
        self.plot_data["x"] = []
        self.plot_data["y"] = []
        self.start_plot_time = time.time()
        self._update_plot()

    def _update_plot(self):
        """更新绘图"""
        if self.plot_data["x"] and self.plot_data["y"]:
            self.line.set_data(self.plot_data["x"], self.plot_data["y"])
            self.ax.relim()
            self.ax.autoscale_view()
            self.canvas.draw()
        self.root.after(1000, self._update_plot)

    def _clear_plot(self):
        """清空绘图"""
        self.plot_data["x"] = []
        self.plot_data["y"] = []
        self.line.set_data([], [])
        self.ax.relim()
        self.ax.autoscale_view()
        self.canvas.draw()

    def _save_excel(self):
        """导出Excel"""
        if not self.structured_data:
            messagebox.showwarning("提示", "无结构化数据可导出!")
            return
        
        path = filedialog.asksaveasfilename(
            defaultextension=".xlsx",
            initialfile=f"Modbus_Excel_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx",
            filetypes=[("Excel文件", "*.xlsx")]
        )
        if not path:
            return
        
        try:
            wb = Workbook()
            ws = wb.active
            ws.title = "Modbus数据"
            # 表头
            ws.append(["时间", "从站地址", "功能码", "寄存器地址", "别名", "数值", "原始十六进制数据"])
            # 写入数据
            for data in self.structured_data:
                ws.append([
                    data["time"],
                    data["slave"],
                    data["func"],
                    data["addr"],
                    self.reg_aliases.get(data["addr"], ""),
                    data["value"],
                    data["raw_hex"]
                ])
            wb.save(path)
            messagebox.showinfo("成功", f"Excel已导出:\n{path}")
        except Exception as e:
            messagebox.showerror("错误", f"导出失败:{str(e)}")

    def _on_close(self):
        """窗口关闭处理"""
        self._stop_all()
        self._save_all_config()
        if self.captured_data and messagebox.askyesno("提示", "是否保存当前数据?"):
            self._save_data()
        self.root.destroy()
        sys.exit(0)

# ========== 程序入口 ==========
if __name__ == "__main__":
    try:
        from ctypes import windll
        windll.shcore.SetProcessDpiAwareness(1)
    except:
        pass
    
    # 检查依赖
    missing_libs = []
    try:
        import serial
    except ImportError:
        missing_libs.append("pyserial")
    
    try:
        from openpyxl import Workbook
    except ImportError:
        missing_libs.append("openpyxl")
    
    try:
        import matplotlib
    except ImportError:
        missing_libs.append("matplotlib")
    
    if missing_libs and messagebox.askyesno("缺少依赖", f"检测到缺少以下库:{', '.join(missing_libs)}\n是否自动安装?"):
        import subprocess
        for lib in missing_libs:
            subprocess.check_call([sys.executable, "-m", "pip", "install", lib])
        messagebox.showinfo("成功", "依赖库安装完成,请重启程序!")
        sys.exit(0)
    
    root = tk.Tk()
    app = ModbusRTUTool(root)
    root.mainloop()

0df5fcc2-4836-4f2b-9288-1cef83ca83ac.png (98.63 KB, 下载次数: 1)

0df5fcc2-4836-4f2b-9288-1cef83ca83ac.png

3ed52d3a-7169-4303-9a5e-3673cbc0cd45.png (93.84 KB, 下载次数: 1)

3ed52d3a-7169-4303-9a5e-3673cbc0cd45.png

免费评分

参与人数 7威望 +1 吾爱币 +15 热心值 +6 收起 理由
苏紫方璇 + 1 + 10 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!
jiyuwusheng + 1 谢谢@Thanks!
netle8 + 1 生成EXE啊
52rap + 1 + 1 谢谢@Thanks!
wuai3456 + 1 + 1 谢谢@Thanks!
laozhang4201 + 1 + 1 热心回复!
wjlqz + 1 + 1 谢谢@Thanks!

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

沙发
badboysky 发表于 2026-1-20 21:56
好牛的样子,不懂但是值得顶
3#
jellycici 发表于 2026-1-20 23:38
4#
HXmmyy 发表于 2026-1-21 00:39
5#
平凡男士 发表于 2026-1-21 08:16
看不懂,但是感觉楼主很牛!
6#
baosumi 发表于 2026-1-21 08:27
这个感觉好适用于那个光伏并网开关调试嘞
7#
jun269 发表于 2026-1-21 08:31
真不懂,外行来着,路过
8#
Leon19960120 发表于 2026-1-21 08:49
jellycici 发表于 2026-1-20 23:38
建议封装为可执行文件

PY代码一跑应该就是可执行exe吧
9#
RoyPenn 发表于 2026-1-21 09:28
好东西,可以做成exe
10#
qidx555 发表于 2026-1-21 10:03
好东西,楼主很牛!
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - 52pojie.cn ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2026-2-3 15:07

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表