吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 2484|回复: 22
上一主题 下一主题
收起左侧

[Python 原创] 485-串口工具

  [复制链接]
跳转到指定楼层
楼主
sty19890218 发表于 2025-12-19 11:09 回帖奖励
本帖最后由 sty19890218 于 2026-1-9 12:46 编辑

[Python] 纯文本查看 复制代码
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Modbus RTU 主站工具(适配版)
功能:支持 01/02/03/04(读)、05/06/15/16(写),串口收发,日志保存,数据可视化
适配:优化窗口尺寸/布局,兼容1080P/768P屏幕,最大化无错乱
"""
import sys
import time
import serial
import serial.tools.list_ports
from typing import List, Tuple, Optional, Dict, Set
from PyQt5.QtWidgets import (
    QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
    QComboBox, QPushButton, QTextEdit, QCheckBox, QLabel, QMessageBox,
    QGroupBox, QGridLayout, QStatusBar, QLineEdit, QFileDialog,
    QTableWidget, QTableWidgetItem, QHeaderView, QSizePolicy
)
from PyQt5.QtCore import (
    QThread, pyqtSignal, Qt, QTimer, QDateTime, QMutex, QMutexLocker,
    QThreadPool, QRunnable, pyqtSlot
)
from PyQt5.QtGui import QFont, QColor, QClipboard, QPalette

# ===================== 全局常量/枚举(适配优化) =====================
MODBUS_ERROR_CODES: Dict[int, str] = {
    0x01: "非法功能码",
    0x02: "非法数据地址",
    0x03: "非法数据值",
    0x04: "从站设备故障",
    0x05: "确认",
    0x06: "从站设备忙",
    0x07: "否定确认",
    0x08: "内存奇偶校验错误"
}

FUNC_CODE_MAP: Dict[int, Tuple[str, str, str, Tuple[int, int]]] = {
    1: ("读线圈", "read", "Read Coils", (0, 65535)),
    2: ("读离散输入", "read", "Read Discrete Inputs", (0, 65535)),
    3: ("读保持寄存器", "read", "Read Holding Registers", (0, 65535)),
    4: ("读输入寄存器", "read", "Read Input Registers", (0, 65535)),
    5: ("写单线圈", "write", "Write Single Coil", (0, 65535)),
    6: ("写单寄存器", "write", "Write Single Register", (0, 65535)),
    15: ("写多线圈", "write", "Write Multiple Coils", (0, 65535)),
    16: ("写多寄存器", "write", "Write Multiple Registers", (0, 65535))
}

DISPLAY_MODE_MAP: Dict[str, str] = {
    "十进制": "dec",
    "十六进制": "hex",
    "二进制": "bin"
}

DEFAULT_SERIAL_PARAMS = {
    "baudrate": 9600,
    "databits": 8,
    "stopbits": 1.0,
    "parity": serial.PARITY_NONE,
    "rtscts": False,
    "timeout": 0.1
}

# 字体适配:降低字号,适配通用屏幕
FONT_CONFIG = {
    "default": QFont("Microsoft YaHei", 8),    # 原14 → 12
    "mono": QFont("Consolas", 8),              # 原14 → 12
    "title": QFont("Microsoft YaHei", 8, QFont.Bold),  # 原16 → 14
    "status": QFont("Microsoft YaHei", 8)      # 原14 → 12
}

# ===================== 工具函数 =====================
def modbus_crc16(data: bytes) -> bytes:
    crc = 0xFFFF
    for byte in data:
        crc ^= byte
        for _ in range(8):
            crc = (crc >> 1) ^ 0xA001 if crc & 0x0001 else crc >> 1
    return bytes([crc & 0xFF, (crc >> 8) & 0xFF])

def validate_numeric_input(text: str, min_val: int = 0, max_val: int = 65535, field_name: str = "值") -> Optional[int]:
    try:
        val = int(text.strip())
        if min_val <= val <= max_val:
            return val
        QMessageBox.warning(None, "输入错误", f"{field_name}需在 {min_val}-{max_val} 之间")
        return None
    except ValueError:
        QMessageBox.warning(None, "输入错误", f"{field_name}请输入有效的整数")
        return None

def validate_modbus_addr(func_code: int, addr: int) -> bool:
    min_addr, max_addr = FUNC_CODE_MAP[func_code][3]
    if not (min_addr <= addr <= max_addr):
        QMessageBox.warning(None, "地址错误",
                           f"{FUNC_CODE_MAP[func_code][0]}的地址范围为 {min_addr}-{max_addr}")
        return False
    return True

def format_value(val: int | bool, mode: str) -> str:
    if isinstance(val, bool):
        return "1" if val else "0"
    if mode == "hex":
        return f"0x{val:04X}"
    elif mode == "bin":
        return f"0b{val:016b}"
    return str(val)

def parse_write_values(text: str, func_code: int, count: int) -> Optional[List[int | bool]]:
    try:
        values = [v.strip() for v in text.split(",") if v.strip()]
        write_values = []
        
        if func_code in [5, 6] and len(values) != 1:
            raise Exception(f"{FUNC_CODE_MAP[func_code][0]}仅支持单个值")
        if func_code in [15, 16] and len(values) < count:
            raise Exception(f"{FUNC_CODE_MAP[func_code][0]}需输入{count}个值(当前{len(values)}个)")
        
        for idx, val in enumerate(values[:count]):
            if func_code in [5, 15]:
                if val.lower() in ["1", "true", "on"]:
                    write_values.append(True)
                elif val.lower() in ["0", "false", "off"]:
                    write_values.append(False)
                else:
                    raise Exception(f"线圈值{idx+1}需为 0/1/True/False")
            else:
                int_val = int(val)
                if not (0 <= int_val <= 65535):
                    raise Exception(f"寄存器值{idx+1}需在 0-65535 之间")
                write_values.append(int_val)
        
        return write_values
    except Exception as e:
        QMessageBox.warning(None, "写值解析失败", str(e))
        return None

def scan_valid_ports() -> List[str]:
    valid_ports: Set[str] = set()
    ports = serial.tools.list_ports.comports()
    for port in ports:
        if port.device and not any(filter_word in port.description.lower()
                                  for filter_word in ["bluetooth", "蓝牙", "virtual", "虚拟"]):
            port_name = f"{port.device} - {port.description}"
            valid_ports.add(port_name)
    return sorted(list(valid_ports))

# ===================== 串口接收线程 =====================
class SerialReceiveThread(QThread):
    receive_signal = pyqtSignal(bytes)
    error_signal = pyqtSignal(str)
   
    def __init__(self, serial_obj: serial.Serial):
        super().__init__()
        self.serial = serial_obj
        self._is_running = False
        self._mutex = QMutex()
        self.setPriority(QThread.LowPriority)
   
    def run(self) -> None:
        with QMutexLocker(self._mutex):
            self._is_running = True
        
        while True:
            with QMutexLocker(self._mutex):
                if not self._is_running or not self.serial.is_open:
                    break
            
            try:
                if self.serial.in_waiting > 0:
                    data = self.serial.read(self.serial.in_waiting)
                    self.receive_signal.emit(data)
                time.sleep(0.005)
            except Exception as e:
                self.error_signal.emit(f"接收异常:{str(e)}")
                break
   
    def stop(self) -> None:
        with QMutexLocker(self._mutex):
            self._is_running = False
        if not self.wait(2000):
            self.terminate()
            self.wait()

# ===================== Modbus 操作线程 =====================
class ModbusThread(QThread):
    poll_result = pyqtSignal(list, int, int)
    status_update = pyqtSignal(str)
    log_update = pyqtSignal(str, str)
    write_result = pyqtSignal(bool, str)
   
    def __init__(self, serial_obj: serial.Serial):
        super().__init__()
        self.serial = serial_obj
        self._is_running = False
        self._mutex = QMutex()
        self.slave_id: int = 1
        self.func_code: int = 3
        self.start_addr: int = 0
        self.count: int = 10
        self.interval: int = 1000
        self.write_values: List[int | bool] = []
        self.is_write_op: bool = False
        self.tx_count: int = 0
        self.err_count: int = 0
   
    def set_read_params(self, slave_id: int, func_code: int, start_addr: int, count: int, interval: int) -> None:
        with QMutexLocker(self._mutex):
            self.slave_id = slave_id
            self.func_code = func_code
            self.start_addr = start_addr
            self.count = min(count, 1000)
            self.interval = interval
            self.is_write_op = False
            self.tx_count = 0
            self.err_count = 0
   
    def set_write_params(self, slave_id: int, func_code: int, start_addr: int, count: int, values: List[int | bool]) -> None:
        with QMutexLocker(self._mutex):
            self.slave_id = slave_id
            self.func_code = func_code
            self.start_addr = start_addr
            self.count = min(count, 1000)
            self.write_values = values[:self.count]
            self.is_write_op = True
            self.tx_count = 0
            self.err_count = 0
   
    def run(self) -> None:
        with QMutexLocker(self._mutex):
            self._is_running = True
        
        try:
            if self.is_write_op:
                self._execute_write()
            else:
                self._execute_read_loop()
        except Exception as e:
            err_msg = f"操作异常:{str(e)}"
            self.status_update.emit(err_msg)
            self.log_update.emit("Modbus-错误", err_msg)
        
        with QMutexLocker(self._mutex):
            self._is_running = False
   
    def _execute_read_loop(self) -> None:
        while True:
            with QMutexLocker(self._mutex):
                if not self._is_running or not self.serial.is_open:
                    break
            
            self._execute_read_once()
            sleep_steps = int(self.interval / 10)
            for _ in range(sleep_steps):
                time.sleep(0.01)
                with QMutexLocker(self._mutex):
                    if not self._is_running:
                        break
   
    def _execute_read_once(self) -> None:
        try:
            req_data = self._build_read_request()
            if not req_data:
                return
            
            self.serial.write(req_data)
            self.tx_count += 1
            self.log_update.emit("Modbus-发送",
                               f"从站{self.slave_id} 功能码{self.func_code:02X} | {req_data.hex(' ')}")
            
            time.sleep(0.01 + (len(req_data) * 10) / self.serial.baudrate)
            if self.serial.in_waiting == 0:
                raise Exception("从站无响应")
            
            resp_data = self.serial.read(self.serial.in_waiting)
            self.log_update.emit("Modbus-接收",
                               f"从站{self.slave_id} | {resp_data.hex(' ')}")
            
            self._validate_response(resp_data)
            data_list = self._parse_response(resp_data)
            self.poll_result.emit(data_list, self.tx_count, self.err_count)
            self.status_update.emit(f"读成功:{self.count}个{FUNC_CODE_MAP[self.func_code][0]}")
        except Exception as e:
            self.err_count += 1
            err_msg = f"读失败:{str(e)}"
            self.status_update.emit(err_msg)
            self.log_update.emit("Modbus-错误",
                               f"{err_msg} | Tx={self.tx_count} Err={self.err_count}")
            self.poll_result.emit([], self.tx_count, self.err_count)
   
    def _execute_write(self) -> None:
        try:
            req_data = self._build_write_request()
            if not req_data:
                return
            
            self.serial.write(req_data)
            self.tx_count += 1
            self.log_update.emit("Modbus-发送",
                               f"从站{self.slave_id} 功能码{self.func_code:02X} | {req_data.hex(' ')}")
            
            time.sleep(0.01 + (len(req_data) * 10) / self.serial.baudrate)
            if self.serial.in_waiting == 0:
                raise Exception("从站无响应")
            
            resp_data = self.serial.read(self.serial.in_waiting)
            self.log_update.emit("Modbus-接收",
                               f"从站{self.slave_id} | {resp_data.hex(' ')}")
            
            self._validate_write_response(req_data, resp_data)
            success_msg = f"写成功:{self.count}个{FUNC_CODE_MAP[self.func_code][0]}"
            self.write_result.emit(True, success_msg)
            self.status_update.emit(success_msg)
            self.poll_result.emit([], self.tx_count, self.err_count)
        except Exception as e:
            self.err_count += 1
            err_msg = f"写失败:{str(e)}"
            self.status_update.emit(err_msg)
            self.log_update.emit("Modbus-错误",
                               f"{err_msg} | Tx={self.tx_count} Err={self.err_count}")
            self.write_result.emit(False, err_msg)
            self.poll_result.emit([], self.tx_count, self.err_count)
   
    def _build_read_request(self) -> Optional[bytearray]:
        try:
            req = bytearray([self.slave_id, self.func_code])
            req.extend(self.start_addr.to_bytes(2, 'big'))
            req.extend(self.count.to_bytes(2, 'big'))
            req.extend(modbus_crc16(req))
            return req
        except Exception as e:
            err_msg = f"读报文组装失败:{str(e)}"
            self.status_update.emit(err_msg)
            self.log_update.emit("Modbus-错误", err_msg)
            return None
   
    def _build_write_request(self) -> Optional[bytearray]:
        try:
            req = bytearray([self.slave_id, self.func_code])
            if self.func_code == 5:
                req.extend(self.start_addr.to_bytes(2, 'big'))
                req.extend((0xFF00 if self.write_values[0] else 0x0000).to_bytes(2, 'big'))
            elif self.func_code == 6:
                req.extend(self.start_addr.to_bytes(2, 'big'))
                req.extend(int(self.write_values[0]).to_bytes(2, 'big'))
            elif self.func_code == 15:
                req.extend(self.start_addr.to_bytes(2, 'big'))
                req.extend(self.count.to_bytes(2, 'big'))
                byte_count = (self.count + 7) // 8
                req.append(byte_count)
                coil_bytes = bytearray(byte_count)
                for idx, val in enumerate(self.write_values[:self.count]):
                    if val:
                        coil_bytes[idx//8] |= 1 << (idx%8)
                req.extend(coil_bytes)
            elif self.func_code == 16:
                req.extend(self.start_addr.to_bytes(2, 'big'))
                req.extend(self.count.to_bytes(2, 'big'))
                req.append(self.count * 2)
                for val in self.write_values[:self.count]:
                    req.extend(int(val).to_bytes(2, 'big'))
            else:
                raise Exception(f"不支持的写功能码:{self.func_code}")
            
            req.extend(modbus_crc16(req))
            return req
        except Exception as e:
            err_msg = f"写报文组装失败:{str(e)}"
            self.status_update.emit(err_msg)
            self.log_update.emit("Modbus-错误", err_msg)
            return None
   
    def _validate_response(self, resp_data: bytes) -> None:
        if len(resp_data) < 5:
            raise Exception("响应数据过短(小于5字节)")
        
        resp_crc = resp_data[-2:]
        calc_crc = modbus_crc16(resp_data[:-2])
        if resp_crc != calc_crc:
            raise Exception(f"CRC校验失败(接收:{resp_crc.hex()} 计算:{calc_crc.hex()})")
        
        if resp_data[1] == self.func_code + 0x80:
            err_code = resp_data[2]
            raise Exception(f"从站异常:{MODBUS_ERROR_CODES.get(err_code, f'未知错误{err_code:02X}')}")
        
        if resp_data[1] != self.func_code:
            raise Exception(f"功能码不匹配(接收:{resp_data[1]} 预期:{self.func_code})")
   
    def _validate_write_response(self, req_data: bytes, resp_data: bytes) -> None:
        if len(resp_data) < 5:
            raise Exception("响应数据过短(小于5字节)")
        
        resp_crc = resp_data[-2:]
        calc_crc = modbus_crc16(resp_data[:-2])
        if resp_crc != calc_crc:
            raise Exception(f"CRC校验失败(接收:{resp_crc.hex()} 计算:{calc_crc.hex()})")
        
        if self.func_code in [5, 6]:
            if resp_data != req_data:
                raise Exception("写响应与请求不匹配")
        elif self.func_code in [15, 16]:
            resp_addr = int.from_bytes(resp_data[2:4], 'big')
            resp_count = int.from_bytes(resp_data[4:6], 'big')
            if resp_addr != self.start_addr or resp_count != self.count:
                raise Exception(f"地址/数量不匹配(地址:{resp_addr}≠{self.start_addr} 数量:{resp_count}≠{self.count})")
   
    def _parse_response(self, resp_data: bytes) -> List[int | bool]:
        data_list = []
        if self.func_code in [1, 2]:
            byte_count = resp_data[2]
            for idx in range(self.count):
                if idx >= byte_count * 8:
                    data_list.append(False)
                else:
                    data_list.append((resp_data[3 + idx//8] & (1 << (idx%8))) != 0)
        elif self.func_code in [3, 4]:
            for i in range(self.count):
                if i * 2 + 3 >= len(resp_data) - 2:
                    data_list.append(0)
                else:
                    data_list.append(int.from_bytes(resp_data[3+i*2 : 5+i*2], 'big'))
        return data_list
   
    def stop(self) -> None:
        with QMutexLocker(self._mutex):
            self._is_running = False
        if not self.wait(2000):
            self.terminate()
            self.wait()

# ===================== 主窗口(布局适配优化) =====================
class ModbusRTUMainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("Modbus RTU 主站工具(适配版)")
        # 初始尺寸适配1080P(原1800x1100 → 1200x800)
        self.setGeometry(100, 100, 1200, 800)
        # 最小尺寸适配768P(原1600x1000 → 800x600)
        self.setMinimumSize(800, 600)
        
        self._setup_global_style()
        self.serial: serial.Serial = serial.Serial()
        self.receive_thread: Optional[SerialReceiveThread] = None
        self.modbus_thread: Optional[ModbusThread] = None
        
        self.loop_send_timer = QTimer()
        self.loop_send_timer.timeout.connect(self.send_data)
        self.port_scan_timer = QTimer()
        self.port_scan_timer.timeout.connect(self.scan_serial_ports)
        self.port_scan_timer.start(2000)
        
        self.is_serial_open: bool = False
        self.is_loop_sending: bool = False
        self.display_mode: str = "dec"
        self.total_receive_bytes: int = 0
        self.session_receive_bytes: int = 0
        
        self._init_ui()
        self.show()
   
    def _setup_global_style(self):
        palette = QPalette()
        palette.setColor(QPalette.Window, QColor(245, 245, 245))
        palette.setColor(QPalette.WindowText, QColor(50, 50, 50))
        palette.setColor(QPalette.Base, QColor(255, 255, 255))
        palette.setColor(QPalette.AlternateBase, QColor(240, 240, 240))
        palette.setColor(QPalette.Button, QColor(230, 230, 230))
        palette.setColor(QPalette.ButtonText, QColor(50, 50, 50))
        self.setPalette(palette)
        
        # 样式表适配:降低控件尺寸,优化最大化布局
        self.setStyleSheet("""
            QMainWindow {background: #F5F5F5;}
            QGroupBox {
                font-weight: bold;
                font-size: 10px;  /* 原16 → 14 */
                border: 1px solid #DDD;
                border-radius: 6px;
                margin-top: 6px;   /* 原12 → 8 */
                padding-top: 6px;  /* 原10 → 8 */
            }
            QPushButton {
                background: #E8E8E8;
                border: 1px solid #CCC;
                border-radius: 6px;
                padding: 6px 14px; /* 原10px20px → 8px16px */
                font-size: 6px;   /* 原14 → 12 */
                min-height: 20px;  /* 原40 → 35 */
                sizePolicy: Expanding;
            }
            QPushButton:hover {background: #D8D8D8;}
            QPushButton:disabled {background: #F0F0F0; color: #999;}
            QPushButton#primaryBtn {
                background: #4A90E2;
                color: white;
                border: none;
                min-height: 25px; /* 原45 → 40 */
            }
            QPushButton#primaryBtn:hover {background: #357ABD;}
            QLabel {font-size: 10px; color: #333;} /* 原14 → 12 */
            QTableWidget {
                gridline-color: #DDD;
                font-family: Consolas;
                font-size: 10px;   /* 原14 → 12 */
                border: 1px solid #EEE;
                border-radius: 4px;
                min-height: 200px; /* 原500 → 300 */
                sizePolicy: Expanding;
            }
            QTableWidget::item:selected {
                background: #E1F0FF;
                color: #333;
            }
            QStatusBar {font-size: 10px; color: #333; background: #F0F0F0;} /* 原14 → 12 */
            QLineEdit, QComboBox, QTextEdit {
                border: 1px solid #CCC;
                border-radius: 4px;
                padding: 4px 8px; /* 原6px10px → 4px8px */
                font-size: 10px;  /* 原14 → 12 */
                min-height: 20px; /* 原35 → 30 */
                sizePolicy: Expanding;
            }
            QLineEdit:focus, QComboBox:focus, QTextEdit:focus {
                border: 1px solid #4A90E2;
                outline: none;
            }
        """)
   
    def _init_ui(self) -> None:
        central_widget = QWidget()
        self.setCentralWidget(central_widget)
        main_layout = QVBoxLayout(central_widget)
        # 布局间距适配(原20 → 12,原25 → 15)
        main_layout.setSpacing(10)
        main_layout.setContentsMargins(10, 10, 10, 10)
        
        main_layout.addWidget(self._build_serial_config_widget())
        main_layout.addWidget(self._build_modbus_config_widget())
        # 优化拉伸权重,最大化时合理分配空间
        main_layout.addWidget(self._build_io_widget(), stretch=1)
        
        self.status_bar = QStatusBar()
        self.setStatusBar(self.status_bar)
        self.status_bar.setFont(FONT_CONFIG["status"])
        self.status_bar.showMessage("就绪 | 未连接串口", 0)
   
    def _build_serial_config_widget(self) -> QGroupBox:
        group = QGroupBox("串口配置 | Serial Configuration")
        group.setFont(FONT_CONFIG["title"])
        layout = QGridLayout(group)
        # 间距适配(原25 → 15,原15 → 10)
        layout.setHorizontalSpacing(10)
        layout.setVerticalSpacing(5)
        
        layout.addWidget(QLabel("串口 Port:"), 0, 0, Qt.AlignRight | Qt.AlignVCenter)
        self.serial_combo = QComboBox()
        # 串口选择框宽度适配(原300 → 200)
        self.serial_combo.setMinimumWidth(150)
        self.serial_combo.setFont(FONT_CONFIG["mono"])
        # 设置拉伸策略,最大化时自动加宽
        self.serial_combo.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
        layout.addWidget(self.serial_combo, 0, 1)
        
        layout.addWidget(QLabel("波特率 Baud:"), 0, 2, Qt.AlignRight | Qt.AlignVCenter)
        self.baudrate_combo = QComboBox()
        self.baudrate_combo.addItems(["9600", "19200", "38400", "57600", "115200", "230400", "460800"])
        self.baudrate_combo.setCurrentText("9600")
        self.baudrate_combo.setMinimumWidth(50)  # 原150 → 100
        layout.addWidget(self.baudrate_combo, 0, 3)
        
        layout.addWidget(QLabel("数据位 Data:"), 1, 0, Qt.AlignRight | Qt.AlignVCenter)
        self.databits_combo = QComboBox()
        self.databits_combo.addItems(["5", "6", "7", "8"])
        self.databits_combo.setCurrentText("8")
        self.databits_combo.setMinimumWidth(50)  # 原150 → 100
        layout.addWidget(self.databits_combo, 1, 1)
        
        layout.addWidget(QLabel("停止位 Stop:"), 1, 2, Qt.AlignRight | Qt.AlignVCenter)
        self.stopbits_combo = QComboBox()
        self.stopbits_combo.addItems(["1", "1.5", "2"])
        self.stopbits_combo.setCurrentText("1")
        self.stopbits_combo.setMinimumWidth(50)  # 原150 → 100
        layout.addWidget(self.stopbits_combo, 1, 3)
        
        layout.addWidget(QLabel("校验位 Parity:"), 2, 0, Qt.AlignRight | Qt.AlignVCenter)
        self.parity_combo = QComboBox()
        self.parity_combo.addItems(["无 None", "奇校验 Odd", "偶校验 Even"])
        self.parity_combo.setCurrentText("无 None")
        self.parity_combo.setMinimumWidth(50)  # 原150 → 100
        layout.addWidget(self.parity_combo, 2, 1)
        
        layout.addWidget(QLabel("流控 Flow:"), 2, 2, Qt.AlignRight | Qt.AlignVCenter)
        self.rts_cts_check = QCheckBox("RTS/CTS")
        self.rts_cts_check.setMinimumHeight(20)  # 原30 → 20
        layout.addWidget(self.rts_cts_check, 2, 3)
        
        self.open_close_btn = QPushButton("打开串口 Open")
        self.open_close_btn.setObjectName("primaryBtn")
        self.open_close_btn.setMinimumWidth(50)  # 原150 → 100
        self.open_close_btn.setMinimumHeight(25)  # 原45 → 35
        self.open_close_btn.clicked.connect(self.toggle_serial)
        layout.addWidget(self.open_close_btn, 0, 4, 3, 1)
        
        return group
   
    def _build_modbus_config_widget(self) -> QGroupBox:
        group = QGroupBox("Modbus 配置 | RTU Master(01/02/03/04/05/06/15/16)")
        group.setFont(FONT_CONFIG["title"])
        layout = QGridLayout(group)
        # 间距适配(原25 → 15,原18 → 10)
        layout.setHorizontalSpacing(15)
        layout.setVerticalSpacing(10)
        
        self.modbus_status_label = QLabel("Disconnected")
        self.modbus_status_label.setStyleSheet("color: #E74C3C; font-weight: bold; font-size: 6px;")  # 原14 → 12
        layout.addWidget(self.modbus_status_label, 0, 0, 1, 2)
        
        status_widget = QWidget()
        status_layout = QHBoxLayout(status_widget)
        status_layout.setSpacing(15)  # 原30 → 15
        
        self.tx_label = QLabel("Tx = 0")
        self.tx_label.setStyleSheet("color: #2980B9; font-weight: bold;")
        self.err_label = QLabel("Err = 0")
        self.err_label.setStyleSheet("color: #E74C3C; font-weight: bold;")
        self.id_label = QLabel("ID = 1")
        self.f_label = QLabel("F = 03")
        self.sr_label = QLabel("SR = 1000ms")
        
        for lbl in [self.tx_label, self.err_label, self.id_label, self.f_label, self.sr_label]:
            lbl.setFont(FONT_CONFIG["mono"])
            lbl.setMinimumHeight(10)  # 原30 → 15
            status_layout.addWidget(lbl)
        status_layout.addStretch()
        
        layout.addWidget(status_widget, 0, 2, 1, 8)
        
        layout.addWidget(QLabel("从站地址 ID:"), 1, 0, Qt.AlignRight)
        self.slave_id_edit = QLineEdit("1")
        self.slave_id_edit.setMinimumWidth(50)  # 原120 → 100
        self.slave_id_edit.setMinimumHeight(10)  # 原35 → 20
        self.slave_id_edit.setFont(FONT_CONFIG["mono"])
        self.slave_id_edit.editingFinished.connect(lambda: self._update_modbus_status("id"))
        layout.addWidget(self.slave_id_edit, 1, 1)
        
        layout.addWidget(QLabel("功能码 Func:"), 1, 2, Qt.AlignRight)
        self.func_code_combo = QComboBox()
        self.func_code_combo.setMinimumWidth(100)  # 原280 → 200
        # 设置拉伸策略
        self.func_code_combo.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
        for code, (name, _, desc, _) in FUNC_CODE_MAP.items():
            self.func_code_combo.addItem(f"{code:02d} {name} ({desc})", code)
        self.func_code_combo.currentIndexChanged.connect(self._on_func_code_changed)
        layout.addWidget(self.func_code_combo, 1, 3)
        
        layout.addWidget(QLabel("起始地址 Addr:"), 1, 4, Qt.AlignRight)
        self.start_addr_edit = QLineEdit("0")
        self.start_addr_edit.setMinimumWidth(50)  # 原120 → 100
        self.start_addr_edit.setMinimumHeight(10)  # 原35 → 20
        self.start_addr_edit.setFont(FONT_CONFIG["mono"])
        self.start_addr_edit.editingFinished.connect(lambda: self._validate_addr_input())
        layout.addWidget(self.start_addr_edit, 1, 5)
        
        layout.addWidget(QLabel("数量 Qty:"), 1, 6, Qt.AlignRight)
        self.count_edit = QLineEdit("10")
        self.count_edit.setMinimumWidth(50)  # 原120 → 100
        self.count_edit.setMinimumHeight(10)  # 原35 → 20
        self.count_edit.setFont(FONT_CONFIG["mono"])
        self.count_edit.editingFinished.connect(self._adjust_table_rows)
        layout.addWidget(self.count_edit, 1, 7)
        
        layout.addWidget(QLabel("扫描周期 MS:"), 2, 0, Qt.AlignRight)
        self.interval_edit = QLineEdit("1000")
        self.interval_edit.setMinimumWidth(100)  # 原120 → 100
        self.interval_edit.setMinimumHeight(20)  # 原35 → 20
        self.interval_edit.setFont(FONT_CONFIG["mono"])
        self.interval_edit.editingFinished.connect(lambda: self._update_modbus_status("interval"))
        layout.addWidget(self.interval_edit, 2, 1)
        
        layout.addWidget(QLabel("写操作值 Value:"), 2, 2, Qt.AlignRight)
        self.write_value_edit = QLineEdit()
        self.write_value_edit.setMinimumWidth(150)  # 原280 → 220
        self.write_value_edit.setMinimumHeight(10)  # 原35 → 20
        self.write_value_edit.setFont(FONT_CONFIG["mono"])
        self.write_value_edit.setPlaceholderText("线圈:0/1/True/False | 寄存器:0-65535 | 多值用逗号分隔")
        self.write_value_edit.setEnabled(False)
        # 设置拉伸策略
        self.write_value_edit.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
        layout.addWidget(self.write_value_edit, 2, 3, 1, 3)
        
        layout.addWidget(QLabel("显示模式:"), 2, 6, Qt.AlignRight)
        self.display_mode_combo = QComboBox()
        self.display_mode_combo.setMinimumWidth(50)  # 原120 → 80
        self.display_mode_combo.addItems(DISPLAY_MODE_MAP.keys())
        self.display_mode_combo.currentTextChanged.connect(self._on_display_mode_changed)
        layout.addWidget(self.display_mode_combo, 2, 7)
        
        self.modbus_btn = QPushButton("开始轮询 Start Poll")
        self.modbus_btn.setObjectName("primaryBtn")
        self.modbus_btn.setMinimumWidth(50)  # 原160 → 120
        self.modbus_btn.setMinimumHeight(10)  # 原45 → 40
        self.modbus_btn.clicked.connect(self.toggle_modbus_operation)
        self.modbus_btn.setEnabled(False)
        layout.addWidget(self.modbus_btn, 3, 0, 1, 2)
        
        self.clear_modbus_btn = QPushButton("清空数据 Clear")
        self.clear_modbus_btn.setMinimumWidth(120)  # 原160 → 120
        self.clear_modbus_btn.setMinimumHeight(20)  # 原40 → 35
        self.clear_modbus_btn.clicked.connect(self.clear_modbus_table)
        layout.addWidget(self.clear_modbus_btn, 3, 2, 1, 2)
        
        self.clear_modbus_log_btn = QPushButton("清空日志 Clear Log")
        self.clear_modbus_log_btn.setMinimumWidth(50)  # 原160 → 120
        self.clear_modbus_log_btn.setMinimumHeight(10)  # 原40 → 35
        self.clear_modbus_log_btn.clicked.connect(self.clear_receive)
        layout.addWidget(self.clear_modbus_log_btn, 3, 4, 1, 2)
        
        self.manual_write_btn = QPushButton("执行写操作 Write")
        self.manual_write_btn.setObjectName("primaryBtn")
        self.manual_write_btn.setMinimumWidth(50)  # 原160 → 120
        self.manual_write_btn.setMinimumHeight(10)  # 原45 → 40
        self.manual_write_btn.clicked.connect(self.execute_manual_write)
        self.manual_write_btn.setEnabled(False)
        layout.addWidget(self.manual_write_btn, 3, 6, 1, 2)
        
        # 表格适配:移除固定宽度,改为自适应
        self.modbus_table = QTableWidget()
        self.modbus_table.setColumnCount(2)
        self.modbus_table.setHorizontalHeaderLabels(["Address (Dec)", "Value"])
        # 列宽自适应:Address列根据内容调整,Value列拉伸
        self.modbus_table.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeToContents)
        self.modbus_table.horizontalHeader().setSectionResizeMode(1, QHeaderView.Stretch)
        self.modbus_table.setRowCount(10)
        self.modbus_table.setAlternatingRowColors(True)
        self.modbus_table.setMinimumHeight(100)  # 原500 → 300
        # 设置表格拉伸策略,最大化时自动占满空间
        self.modbus_table.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
        self._init_modbus_table()
        layout.addWidget(self.modbus_table, 4, 0, 1, 8)
        
        return group
   
    def _build_io_widget(self) -> QWidget:
        widget = QWidget()
        layout = QHBoxLayout(widget)
        layout.setSpacing(10)  # 原25 → 20
        
        receive_group = QGroupBox("接收日志 | Receive Log")
        receive_group.setFont(FONT_CONFIG["title"])
        receive_layout = QVBoxLayout(receive_group)
        # 设置接收区拉伸策略
        receive_group.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
        
        receive_ctrl = QHBoxLayout()
        self.hex_receive_check = QCheckBox("十六进制显示 Hex")
        self.timestamp_check = QCheckBox("显示时间戳 Time")
        self.timestamp_check.setChecked(True)
        self.auto_wrap_check = QCheckBox("自动换行 Wrap")
        self.auto_wrap_check.setChecked(True)
        self.auto_scroll_check = QCheckBox("自动滚屏 Scroll")
        self.auto_scroll_check.setChecked(True)
        
        self.log_filter_combo = QComboBox()
        self.log_filter_combo.addItems(["全部日志", "仅Modbus", "仅串口"])
        self.log_filter_combo.setMinimumWidth(80)  # 原150 → 120
        self.log_filter_combo.currentTextChanged.connect(self._filter_receive_log)
        
        for cb in [self.hex_receive_check, self.timestamp_check, self.auto_wrap_check, self.auto_scroll_check]:
            cb.setMinimumHeight(20)  # 原30 → 25
        
        receive_ctrl.addWidget(self.hex_receive_check)
        receive_ctrl.addWidget(self.timestamp_check)
        receive_ctrl.addWidget(self.auto_wrap_check)
        receive_ctrl.addWidget(self.auto_scroll_check)
        receive_ctrl.addWidget(QLabel("日志过滤:"))
        receive_ctrl.addWidget(self.log_filter_combo)
        receive_ctrl.addStretch()
        
        self.copy_receive_btn = QPushButton("复制 Copy")
        self.copy_receive_btn.setMinimumWidth(80)  # 原100 → 80
        self.copy_receive_btn.setMinimumHeight(20)  # 原40 → 35
        self.copy_receive_btn.clicked.connect(self.copy_receive_content)
        
        self.save_log_btn = QPushButton("保存 Save")
        self.save_log_btn.setMinimumWidth(80)  # 原100 → 80
        self.save_log_btn.setMinimumHeight(20)  # 原40 → 35
        self.save_log_btn.clicked.connect(self.save_receive_log)
        
        self.clear_receive_btn = QPushButton("清空 Clear")
        self.clear_receive_btn.setMinimumWidth(80)  # 原100 → 80
        self.clear_receive_btn.setMinimumHeight(20)  # 原40 → 35
        self.clear_receive_btn.clicked.connect(self.clear_receive)
        
        receive_ctrl.addWidget(self.copy_receive_btn)
        receive_ctrl.addWidget(self.save_log_btn)
        receive_ctrl.addWidget(self.clear_receive_btn)
        receive_layout.addLayout(receive_ctrl)
        
        self.receive_text = QTextEdit()
        self.receive_text.setReadOnly(True)
        self.receive_text.setFont(FONT_CONFIG["mono"])
        self.receive_text.setLineWrapMode(QTextEdit.WidgetWidth if self.auto_wrap_check.isChecked() else QTextEdit.NoWrap)
        self.receive_text.setMinimumHeight(100)  # 原400 → 250
        # 设置接收框拉伸策略
        self.receive_text.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
        self.auto_wrap_check.clicked.connect(self._toggle_auto_wrap)
        receive_layout.addWidget(self.receive_text)
        
        self.byte_count_label = QLabel("总接收字节:0 | 本次会话:0")
        self.byte_count_label.setAlignment(Qt.AlignRight)
        self.byte_count_label.setFont(FONT_CONFIG["mono"])
        self.byte_count_label.setMinimumHeight(10)  # 原30 → 25
        receive_layout.addWidget(self.byte_count_label)
        
        send_group = QGroupBox("发送数据 | Transmit Data")
        send_group.setFont(FONT_CONFIG["title"])
        send_layout = QVBoxLayout(send_group)
        # 设置发送区拉伸策略(宽度占比降低)
        send_group.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Expanding)
        
        send_ctrl = QHBoxLayout()
        self.hex_send_check = QCheckBox("十六进制发送 Hex")
        self.auto_clear_send_check = QCheckBox("发送后清空 Clear After Send")
        self.add_newline_check = QCheckBox("自动加换行 Newline")
        self.auto_crc_check = QCheckBox("自动计算Modbus CRC")
        self.auto_crc_check.setChecked(True)
        
        for cb in [self.hex_send_check, self.auto_clear_send_check, self.add_newline_check, self.auto_crc_check]:
            cb.setMinimumHeight(10)  # 原30 → 25
        
        send_ctrl.addWidget(self.hex_send_check)
        send_ctrl.addWidget(self.auto_clear_send_check)
        send_ctrl.addWidget(self.add_newline_check)
        send_ctrl.addWidget(self.auto_crc_check)
        send_ctrl.addStretch()
        
        self.loop_send_check = QCheckBox("循环发送 Loop")
        self.loop_send_check.setMinimumHeight(15)  # 原30 → 25
        self.loop_interval_edit = QLineEdit("1000")
        self.loop_interval_edit.setMinimumWidth(50)  # 原100 → 80
        self.loop_interval_edit.setMinimumHeight(10)  # 原35 → 30
        self.loop_interval_edit.setPlaceholderText("间隔(ms)")
        self.loop_interval_edit.setFont(FONT_CONFIG["mono"])
        self.loop_interval_edit.setEnabled(False)
        self.loop_send_check.clicked.connect(lambda: self.loop_interval_edit.setEnabled(self.loop_send_check.isChecked()))
        send_ctrl.addWidget(self.loop_send_check)
        send_ctrl.addWidget(QLabel("间隔:"))
        send_ctrl.addWidget(self.loop_interval_edit)
        send_layout.addLayout(send_ctrl)
        
        self.send_text = QTextEdit()
        self.send_text.setFont(FONT_CONFIG["mono"])
        self.send_text.setMinimumHeight(50)  # 原150 → 100
        # 设置发送框拉伸策略
        self.send_text.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
        send_layout.addWidget(self.send_text)
        
        send_btn_layout = QHBoxLayout()
        self.send_btn = QPushButton("发送 Send")
        self.send_btn.setObjectName("primaryBtn")
        self.send_btn.setMinimumWidth(50)  # 原120 → 100
        self.send_btn.setMinimumHeight(10)  # 原45 → 40
        self.send_btn.clicked.connect(self.send_data)
        self.send_btn.setEnabled(False)
        
        self.stop_loop_btn = QPushButton("停止循环 Stop")
        self.stop_loop_btn.setMinimumWidth(50)  # 原120 → 100
        self.stop_loop_btn.setMinimumHeight(10)  # 原40 → 35
        self.stop_loop_btn.clicked.connect(self.stop_loop_send)
        self.stop_loop_btn.setEnabled(False)
        
        send_btn_layout.addWidget(self.send_btn)
        send_btn_layout.addWidget(self.stop_loop_btn)
        send_btn_layout.addStretch()
        send_layout.addLayout(send_btn_layout)
        
        # 调整收发区拉伸权重(原3:1 → 7:1,更合理)
        layout.addWidget(receive_group, stretch=7)
        layout.addWidget(send_group, stretch=1)
        
        return widget
   
    # ===================== UI 辅助方法 =====================
    def _init_modbus_table(self) -> None:
        for i in range(self.modbus_table.rowCount()):
            addr_item = QTableWidgetItem(f"{i:05d}")
            addr_item.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)
            addr_item.setBackground(QColor(248, 248, 248))
            addr_item.setFont(FONT_CONFIG["mono"])
            self.modbus_table.setItem(i, 0, addr_item)
            
            val_item = QTableWidgetItem("0")
            val_item.setFont(FONT_CONFIG["mono"])
            self.modbus_table.setItem(i, 1, val_item)
        
        # 行高适配(原35 → 10)
        for i in range(self.modbus_table.rowCount()):
            self.modbus_table.setRowHeight(i, 10)
   
    def _adjust_table_rows(self) -> None:
        count = validate_numeric_input(self.count_edit.text(), 1, 1000, "数量")
        if count:
            self.modbus_table.setRowCount(count)
            for i in range(self.modbus_table.rowCount()):
                self.modbus_table.setRowHeight(i, 10)
            self._init_modbus_table()
   
    def _on_func_code_changed(self) -> None:
        func_code = self.func_code_combo.currentData()
        is_write = FUNC_CODE_MAP[func_code][1] == "write"
        
        self.write_value_edit.setEnabled(is_write)
        self.manual_write_btn.setEnabled(is_write and self.is_serial_open)
        self.interval_edit.setEnabled(not is_write)
        
        self.f_label.setText(f"F = {func_code:02d}")
        if is_write:
            self.modbus_btn.setText("写操作需点击「执行写操作」")
            self.modbus_btn.setEnabled(False)
            if func_code in [5, 15]:
                self.write_value_edit.setPlaceholderText("线圈值:0/1/True/False | 多值用逗号分隔")
            else:
                self.write_value_edit.setPlaceholderText("寄存器值:0-65535 | 多值用逗号分隔")
        else:
            self.modbus_btn.setText("开始轮询 Start Poll")
            self.modbus_btn.setEnabled(self.is_serial_open)
            self.write_value_edit.setPlaceholderText("写操作值 Value:线圈:0/1,寄存器:十进制,多值用逗号分隔")
        
        self._validate_addr_input()
   
    def _validate_addr_input(self) -> None:
        func_code = self.func_code_combo.currentData()
        addr = validate_numeric_input(self.start_addr_edit.text(), 0, 65535, "起始地址")
        if addr and not validate_modbus_addr(func_code, addr):
            self.start_addr_edit.setText(str(FUNC_CODE_MAP[func_code][3][0]))
   
    def _on_display_mode_changed(self, text: str) -> None:
        self.display_mode = DISPLAY_MODE_MAP[text]
        for row in range(self.modbus_table.rowCount()):
            val_item = self.modbus_table.item(row, 1)
            if val_item:
                try:
                    raw_text = val_item.text().replace("0x", "").replace("0b", "").strip()
                    if raw_text in ["0", "1"]:
                        raw_val = int(raw_text)
                    else:
                        raw_val = int(raw_text, 0)
                    val_item.setText(format_value(raw_val, self.display_mode))
                except:
                    pass
   
    def _toggle_auto_wrap(self) -> None:
        mode = QTextEdit.WidgetWidth if self.auto_wrap_check.isChecked() else QTextEdit.NoWrap
        self.receive_text.setLineWrapMode(mode)
   
    def _update_modbus_status(self, type_: str) -> None:
        if type_ == "id":
            slave_id = validate_numeric_input(self.slave_id_edit.text(), 1, 247, "从站地址")
            if slave_id:
                self.id_label.setText(f"ID = {slave_id}")
        elif type_ == "interval":
            interval = validate_numeric_input(self.interval_edit.text(), 10, 30000, "扫描周期")
            if interval:
                self.sr_label.setText(f"SR = {interval}ms")
   
    def _filter_receive_log(self, filter_type: str) -> None:
        original_text = self.receive_text.toPlainText()
        lines = original_text.split("\n")
        filtered_lines = []
        
        for line in lines:
            if not line.strip():
                continue
            if filter_type == "全部日志":
                filtered_lines.append(line)
            elif filter_type == "仅Modbus" and "[Modbus-" in line:
                filtered_lines.append(line)
            elif filter_type == "仅串口" and "[串口-" in line:
                filtered_lines.append(line)
        
        self.receive_text.clear()
        self.receive_text.setText("\n".join(filtered_lines))
   
    # ===================== 串口操作 =====================
    def scan_serial_ports(self) -> None:
        current_port = self.serial_combo.currentText()
        valid_ports = scan_valid_ports()
        
        current_ports = [self.serial_combo.itemText(i) for i in range(self.serial_combo.count())]
        if valid_ports != current_ports:
            self.serial_combo.clear()
            self.serial_combo.addItems(valid_ports)
            if current_port and current_port in valid_ports:
                self.serial_combo.setCurrentText(current_port)
   
    def toggle_serial(self) -> None:
        if not self.is_serial_open:
            try:
                if not self.serial_combo.currentText():
                    QMessageBox.warning(self, "串口错误", "请选择有效的串口")
                    return
                port = self.serial_combo.currentText().split(" - ")[0]
                self.serial.port = port
                self.serial.baudrate = int(self.baudrate_combo.currentText())
                self.serial.bytesize = int(self.databits_combo.currentText())
                self.serial.stopbits = float(self.stopbits_combo.currentText())
                self.serial.parity = {
                    "无 None": serial.PARITY_NONE,
                    "奇校验 Odd": serial.PARITY_ODD,
                    "偶校验 Even": serial.PARITY_EVEN
                }[self.parity_combo.currentText()]
                self.serial.timeout = 0.1
                self.serial.rtscts = self.rts_cts_check.isChecked()
                self.serial.xonxoff = False
                self.serial.dsrdtr = False
               
                self.serial.open()
                self.is_serial_open = True
                self.open_close_btn.setText("关闭串口 Close")
                self.send_btn.setEnabled(True)
               
                func_code = self.func_code_combo.currentData()
                self.modbus_btn.setEnabled(FUNC_CODE_MAP[func_code][1] == "read")
                self.manual_write_btn.setEnabled(FUNC_CODE_MAP[func_code][1] == "write")
               
                self.modbus_status_label.setText("Connected")
                self.modbus_status_label.setStyleSheet("color: #27AE60; font-weight: bold; font-size: 12px;")
                self.status_bar.showMessage(f"串口已打开:{port} | 波特率:{self.serial.baudrate}", 3000)
               
                self.receive_thread = SerialReceiveThread(self.serial)
                self.receive_thread.receive_signal.connect(self.handle_receive_data)
                self.receive_thread.error_signal.connect(lambda msg: self.status_bar.showMessage(msg, 3000))
                self.receive_thread.start()
                self.session_receive_bytes = 0
            except Exception as e:
                QMessageBox.critical(self, "串口错误", f"打开串口失败:{str(e)}\n请检查串口是否被占用")
        else:
            self.stop_all_operations()
            self.serial.close()
            self.is_serial_open = False
            self.open_close_btn.setText("打开串口 Open")
            self.send_btn.setEnabled(False)
            self.modbus_btn.setEnabled(False)
            self.manual_write_btn.setEnabled(False)
            self.modbus_status_label.setText("Disconnected")
            self.modbus_status_label.setStyleSheet("color: #E74C3C; font-weight: bold; font-size: 12px;")
            self.status_bar.showMessage("串口已关闭", 3000)
   
    # ===================== 数据收发 =====================
    def handle_receive_data(self, data: bytes) -> None:
        self.total_receive_bytes += len(data)
        self.session_receive_bytes += len(data)
        self.byte_count_label.setText(f"总接收字节:{self.total_receive_bytes} | 本次会话:{self.session_receive_bytes}")
        
        timestamp = QDateTime.currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz") if self.timestamp_check.isChecked() else ""
        if self.hex_receive_check.isChecked():
            display_data = data.hex(' ')
        else:
            try:
                display_data = data.decode('utf-8', errors='replace')
            except:
                display_data = str(data)
        
        log_prefix = f"[串口-接收][{timestamp}] " if timestamp else "[串口-接收] "
        log_line = f"{log_prefix}{display_data}\n"
        self.receive_text.append(log_line)
        
        if self.auto_scroll_check.isChecked():
            self.receive_text.moveCursor(self.receive_text.textCursor().End)
   
    def send_data(self) -> None:
        if not self.is_serial_open:
            QMessageBox.warning(self, "错误", "串口未打开")
            return
        try:
            send_text = self.send_text.toPlainText().strip()
            if not send_text:
                return
            
            if self.hex_send_check.isChecked():
                send_hex = send_text.replace(" ", "").replace("\n", "").replace("\r", "")
                if len(send_hex) % 2 != 0:
                    raise Exception("十六进制数据长度必须为偶数")
               
                send_data = bytes.fromhex(send_hex)
               
                if self.auto_crc_check.isChecked() and len(send_data) >= 2:
                    if len(send_data) >= 4 and send_data[-2:] == modbus_crc16(send_data[:-2]):
                        send_data = send_data[:-2]
                    send_data += modbus_crc16(send_data)
                    self.status_bar.showMessage(f"自动计算CRC:{send_data[-2:].hex()}", 2000)
            else:
                send_data = send_text.encode('utf-8')
                if self.add_newline_check.isChecked():
                    send_data += b'\r\n'
            
            self.serial.write(send_data)
            self.status_bar.showMessage(f"发送成功:{len(send_data)} 字节", 2000)
            
            timestamp = QDateTime.currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz") if self.timestamp_check.isChecked() else ""
            log_prefix = f"[串口-发送][{timestamp}] " if timestamp else "[串口-发送] "
            
            if self.hex_receive_check.isChecked():
                log_data = send_data.hex(' ')
            else:
                log_data = send_data.decode('utf-8', errors='replace')
            
            self.receive_text.append(f"{log_prefix}{log_data}\n")
            
            if self.auto_clear_send_check.isChecked():
                self.send_text.clear()
            
            if self.loop_send_check.isChecked() and not self.is_loop_sending:
                interval = validate_numeric_input(self.loop_interval_edit.text(), 10, 30000, "循环间隔") or 1000
                self.loop_send_timer.start(interval)
                self.is_loop_sending = True
                self.send_btn.setEnabled(False)
                self.stop_loop_btn.setEnabled(True)
                self.status_bar.showMessage(f"开始循环发送(间隔{interval}ms)", 2000)
        except Exception as e:
            err_msg = f"发送失败:{str(e)}"
            self.status_bar.showMessage(err_msg, 2000)
            QMessageBox.critical(self, "错误", err_msg)
   
    def stop_loop_send(self) -> None:
        self.loop_send_timer.stop()
        self.is_loop_sending = False
        self.send_btn.setEnabled(True)
        self.stop_loop_btn.setEnabled(False)
        self.status_bar.showMessage("循环发送已停止", 2000)
   
    # ===================== Modbus 操作 =====================
    def toggle_modbus_operation(self) -> None:
        if not self.modbus_thread or not self.modbus_thread.isRunning():
            slave_id = validate_numeric_input(self.slave_id_edit.text(), 1, 247, "从站地址")
            func_code = self.func_code_combo.currentData()
            start_addr = validate_numeric_input(self.start_addr_edit.text(), 0, 65535, "起始地址")
            count = validate_numeric_input(self.count_edit.text(), 1, 1000, "数量")
            interval = validate_numeric_input(self.interval_edit.text(), 10, 30000, "扫描周期")
            
            if not all([slave_id, func_code, start_addr, count, interval]):
                return
            
            if not validate_modbus_addr(func_code, start_addr):
                return
            
            self.modbus_thread = ModbusThread(self.serial)
            self.modbus_thread.set_read_params(slave_id, func_code, start_addr, count, interval)
            self.modbus_thread.poll_result.connect(self.update_modbus_table)
            self.modbus_thread.status_update.connect(lambda msg: self.status_bar.showMessage(msg, 2000))
            self.modbus_thread.log_update.connect(self._append_modbus_log)
            self.modbus_thread.start()
            self.modbus_btn.setText("停止轮询 Stop Poll")
            self.status_bar.showMessage(f"开始Modbus轮询 | 从站{slave_id} 功能码{func_code:02d}", 2000)
        else:
            self.stop_modbus_operation()
            self.modbus_btn.setText("开始轮询 Start Poll")
            self.status_bar.showMessage("Modbus轮询已停止", 2000)
   
    def stop_modbus_operation(self) -> None:
        if self.modbus_thread and self.modbus_thread.isRunning():
            self.modbus_thread.stop()
            self.modbus_thread.wait()
   
    def update_modbus_table(self, data_list: List[int | bool], tx_count: int, err_count: int) -> None:
        self.tx_label.setText(f"Tx = {tx_count}")
        self.err_label.setText(f"Err = {err_count}")
        
        for row in range(self.modbus_table.rowCount()):
            if row < len(data_list):
                val = data_list[row]
                formatted_val = format_value(val, self.display_mode)
                val_item = QTableWidgetItem(formatted_val)
                val_item.setFont(FONT_CONFIG["mono"])
                self.modbus_table.setItem(row, 1, val_item)
            else:
                val_item = QTableWidgetItem("0")
                val_item.setFont(FONT_CONFIG["mono"])
                self.modbus_table.setItem(row, 1, val_item)
   
    def _append_modbus_log(self, log_type: str, content: str) -> None:
        timestamp = QDateTime.currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz") if self.timestamp_check.isChecked() else ""
        log_prefix = f"[{log_type}][{timestamp}] " if timestamp else f"[{log_type}] "
        self.receive_text.append(f"{log_prefix}{content}\n")
        
        if self.auto_scroll_check.isChecked():
            self.receive_text.moveCursor(self.receive_text.textCursor().End)
   
    def execute_manual_write(self) -> None:
        if not self.is_serial_open:
            QMessageBox.warning(self, "错误", "串口未打开")
            return
        
        slave_id = validate_numeric_input(self.slave_id_edit.text(), 1, 247, "从站地址")
        func_code = self.func_code_combo.currentData()
        start_addr = validate_numeric_input(self.start_addr_edit.text(), 0, 65535, "起始地址")
        count = validate_numeric_input(self.count_edit.text(), 1, 1000, "数量")
        
        if not all([slave_id, func_code, start_addr, count]):
            return
        
        if not validate_modbus_addr(func_code, start_addr):
            return
        
        write_values = parse_write_values(self.write_value_edit.text(), func_code, count)
        if not write_values:
            return
        
        self.modbus_thread = ModbusThread(self.serial)
        self.modbus_thread.set_write_params(slave_id, func_code, start_addr, count, write_values)
        self.modbus_thread.write_result.connect(self.handle_write_result)
        self.modbus_thread.status_update.connect(lambda msg: self.status_bar.showMessage(msg, 2000))
        self.modbus_thread.log_update.connect(self._append_modbus_log)
        self.modbus_thread.start()
   
    def handle_write_result(self, success: bool, msg: str) -> None:
        if success:
            QMessageBox.information(self, "操作成功", msg)
        else:
            QMessageBox.critical(self, "操作失败", msg)
   
    def clear_modbus_table(self) -> None:
        for row in range(self.modbus_table.rowCount()):
            val_item = QTableWidgetItem("0")
            val_item.setFont(FONT_CONFIG["mono"])
            self.modbus_table.setItem(row, 1, val_item)
   
    def clear_receive(self) -> None:
        self.receive_text.clear()
        self.session_receive_bytes = 0
        self.byte_count_label.setText(f"总接收字节:{self.total_receive_bytes} | 本次会话:0")
   
    def copy_receive_content(self) -> None:
        clipboard = QApplication.clipboard()
        clipboard.setText(self.receive_text.toPlainText())
        self.status_bar.showMessage("接收日志已复制到剪贴板", 2000)
   
    def save_receive_log(self) -> None:
        file_path, _ = QFileDialog.getSaveFileName(self, "保存日志", f"modbus_log_{QDateTime.currentDateTime().toString('yyyyMMddhhmmss')}.txt", "Text Files (*.txt);;All Files (*)")
        if file_path:
            try:
                with open(file_path, 'w', encoding='utf-8') as f:
                    f.write(self.receive_text.toPlainText())
                self.status_bar.showMessage(f"日志已保存到:{file_path}", 3000)
            except Exception as e:
                QMessageBox.critical(self, "保存失败", f"日志保存失败:{str(e)}")
   
    def stop_all_operations(self) -> None:
        self.stop_loop_send()
        self.stop_modbus_operation()
        if self.receive_thread and self.receive_thread.isRunning():
            self.receive_thread.stop()
            self.receive_thread.wait()



# ===================== 程序入口 =====================
if __name__ == "__main__":
    # 解决高DPI显示问题
    QApplication.setAttribute(Qt.AA_EnableHighDpiScaling)
    QApplication.setAttribute(Qt.AA_UseHighDpiPixmaps)
   
    app = QApplication(sys.argv)
    app.setFont(FONT_CONFIG["default"])
   
    window = ModbusRTUMainWindow()
    sys.exit(app.exec_())



更新版


[Python] 纯文本查看 复制代码
import sys
import serial
import json
import csv
import socket
import asyncio
from datetime import datetime
from typing import List, Optional, Any, Dict, Tuple
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, 
                            QGroupBox, QLabel, QComboBox, QLineEdit, QPushButton, QCheckBox, 
                            QTextEdit, QTableWidget, QTableWidgetItem, QStatusBar, QMessageBox, 
                            QFileDialog, QHeaderView, QSizePolicy, QTabWidget, QSpinBox)
from PyQt5.QtCore import Qt, QThread, pyqtSignal, QTimer, QDateTime
from PyQt5.QtGui import QPalette, QColor, QFont, QBrush
import bleak
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic

# ===================== 常量定义(字体适配+控件尺寸兜底) =====================
FONT_CONFIG = {
    "title": QFont("SimHei", 14, QFont.Bold),  # 标题字体(适度放大,避免过度挤压)
    "subtitle": QFont("SimHei", 12, QFont.Bold),  # 子控件字体(微调字号,优先保证显示)
    "mono": QFont("Consolas", 11, QFont.Bold),
    "status": QFont("SimHei", 11, QFont.Bold)
}

DISPLAY_MODE_MAP = {
    "十进制": "dec",
    "十六进制": "hex",
    "二进制": "bin"
}

FUNC_CODE_MAP = {
    1: ("读线圈状态", "read", "0x01", (0, 65535)),
    2: ("读离散输入", "read", "0x02", (0, 65535)),
    3: ("读保持寄存器", "read", "0x03", (0, 65535)),
    4: ("读输入寄存器", "read", "0x04", (0, 65535)),
    5: ("写单个线圈", "write", "0x05", (0, 65535)),
    6: ("写单个寄存器", "write", "0x06", (0, 65535)),
    15: ("写多个线圈", "write", "0x0F", (0, 65535)),
    16: ("写多个寄存器", "write", "0x10", (0, 65535))
}

# 新增:数据记录相关常量
RECORD_FORMAT_OPTIONS = ["CSV", "JSON"]
RECORD_STATUS = {
    "STOPPED": "未记录",
    "RECORDING": "记录中"
}

# 新增:蓝牙相关常量
BLUETOOTH_CHARACTERISTIC_UUID = "0000ffe1-0000-1000-8000-00805f9b34fb"  # 常用串口服务UUID

# 新增:网络相关常量
NETWORK_PROTOCOLS = ["TCP客户端", "TCP服务器", "UDP"]

# ===================== 工具函数 =====================
def modbus_crc16(data: bytes) -> bytes:
    crc = 0xFFFF
    for byte in data:
        crc ^= byte
        for _ in range(8):
            crc = (crc >> 1) ^ 0xA001 if crc & 0x0001 else crc >> 1
    return bytes([crc & 0xFF, (crc >> 8) & 0xFF])

def xor_checksum(data: bytes) -> bytes:
    xor_result = 0x00
    for byte in data:
        xor_result ^= byte
    return bytes([xor_result])

def _add_checksum(data, checksum_type, debugger=None):
    if checksum_type == 'CRC-16':
        if debugger:
            return data + debugger.calculate_crc16(data)
        else:
            return data + modbus_crc16(data)
    elif checksum_type == 'XOR':
        if debugger:
            xor_value = debugger.calculate_xor(data)
        else:
            xor_value = 0
            for byte in data:
                xor_value ^= byte
        return data + bytes([xor_value])
    return data

def validate_numeric_input(text: str, min_val: int, max_val: int, field_name: str) -> Optional[int]:
    try:
        value = int(text)
        if min_val <= value <= max_val:
            return value
        else:
            QMessageBox.warning(None, "输入错误", f"{field_name}必须在{min_val}-{max_val}之间")
            return None
    except ValueError:
        QMessageBox.warning(None, "输入错误", f"{field_name}必须是有效的整数")
        return None

def validate_modbus_addr(func_code: int, addr: int) -> bool:
    min_addr, max_addr = FUNC_CODE_MAP[func_code][3]
    if not (min_addr <= addr <= max_addr):
        QMessageBox.warning(None, "地址错误", f"功能码{func_code}的地址范围是{min_addr}-{max_addr}")
        return False
    return True

def format_value(value: int | bool, mode: str) -> str:
    if isinstance(value, bool):
        return "1" if value else "0"
    if mode == "hex":
        return f"0x{value:04X}"
    elif mode == "bin":
        return f"0b{value:016b}"
    else:
        return str(value)

def parse_write_values(text: str, func_code: int, count: int) -> Optional[List[int]]:
    try:
        values = []
        text = text.strip()
        if not text:
            QMessageBox.warning(None, "输入错误", "请输入要写入的值")
            return None
            
        parts = [p.strip() for p in text.split(",")]
        
        if func_code in [5, 15]:
            for part in parts:
                if part.lower() in ["0", "false"]:
                    values.append(0)
                elif part.lower() in ["1", "true"]:
                    values.append(1)
                else:
                    QMessageBox.warning(None, "输入错误", "线圈值必须是0/1或True/False")
                    return None
        else:
            for part in parts:
                val = int(part, 0)
                if 0 <= val <= 65535:
                    values.append(val)
                else:
                    QMessageBox.warning(None, "输入错误", "寄存器值必须在0-65535之间")
                    return None
        
        if len(values) != count and not (func_code in [5, 6] and len(values) == 1 and count == 1):
            QMessageBox.warning(None, "输入错误", f"值的数量必须为{count}个")
            return None
            
        if func_code in [5, 6] and count > 1:
            return values * count
            
        return values
    except Exception as e:
        QMessageBox.warning(None, "解析错误", f"值解析失败:{str(e)}")
        return None

def scan_valid_ports() -> List[str]:
    import serial.tools.list_ports
    ports = []
    for port in serial.tools.list_ports.comports():
        ports.append(f"{port.device} - {port.description}")
    return ports

# 新增:配置文件处理函数
def save_config(config: Dict, file_path: str) -> bool:
    """保存配置到JSON文件"""
    try:
        with open(file_path, 'w', encoding='utf-8') as f:
            json.dump(config, f, ensure_ascii=False, indent=4)
        return True
    except Exception as e:
        QMessageBox.warning(None, "保存失败", f"配置保存失败: {str(e)}")
        return False

def load_config(file_path: str) -> Optional[Dict]:
    """从JSON文件加载配置"""
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            return json.load(f)
    except Exception as e:
        QMessageBox.warning(None, "加载失败", f"配置加载失败: {str(e)}")
        return None

# 新增:数据记录函数
def write_record_to_file(file_path: str, format_type: str, timestamp: str, 
                        conn_type: str, conn_info: str, data: str, is_send: bool) -> bool:
    """将数据记录写入文件(支持蓝牙/网络/串口)"""
    try:
        if format_type == "CSV":
            # 首次写入需要添加表头
            file_exists = False
            try:
                with open(file_path, 'r') as f:
                    file_exists = True
            except:
                pass
                
            with open(file_path, 'a', newline='', encoding='utf-8') as f:
                writer = csv.writer(f)
                if not file_exists:
                    writer.writerow(["时间戳", "连接类型", "连接信息", "类型", "数据"])
                
                row = [timestamp, conn_type, conn_info, "发送" if is_send else "接收", data]
                writer.writerow(row)
                
        elif format_type == "JSON":
            record = {
                "timestamp": timestamp,
                "conn_type": conn_type,
                "conn_info": conn_info,
                "data_type": "发送" if is_send else "接收",
                "data": data
            }
            
            # 读取现有数据
            records = []
            try:
                with open(file_path, 'r', encoding='utf-8') as f:
                    records = json.load(f)
            except:
                pass
                
            records.append(record)
            
            # 写入更新后的数据
            with open(file_path, 'w', encoding='utf-8') as f:
                json.dump(records, f, ensure_ascii=False, indent=2)
                
        return True
    except Exception as e:
        QMessageBox.warning(None, "记录失败", f"数据记录失败: {str(e)}")
        return False

# ===================== 线程类 =====================
class SerialReceiveThread(QThread):
    receive_signal = pyqtSignal(bytes)
    error_signal = pyqtSignal(str)
    
    def __init__(self, serial_port: serial.Serial):
        super().__init__()
        self.serial = serial_port
        self.running = True
    
    def run(self) -> None:
        while self.running and self.serial.is_open:
            try:
                if self.serial.in_waiting:
                    data = self.serial.read(self.serial.in_waiting)
                    self.receive_signal.emit(data)
            except Exception as e:
                self.error_signal.emit(f"接收错误:{str(e)}")
                self.running = False
            self.msleep(10)
    
    def stop(self) -> None:
        self.running = False

class ModbusThread(QThread):
    poll_result = pyqtSignal(list, int, int)
    write_result = pyqtSignal(bool, str)
    status_update = pyqtSignal(str)
    log_update = pyqtSignal(str, str)
    
    def __init__(self, serial_port: serial.Serial):
        super().__init__()
        self.serial = serial_port
        self.running = False
        self.params = {}
        self.operation_type = "read"
        self.tx_count = 0
        self.err_count = 0
    
    def set_read_params(self, slave_id: int, func_code: int, start_addr: int, count: int, interval: int) -> None:
        self.params = {
            "slave_id": slave_id,
            "func_code": func_code,
            "start_addr": start_addr,
            "count": count,
            "interval": interval
        }
        self.operation_type = "read"
    
    def set_write_params(self, slave_id: int, func_code: int, start_addr: int, count: int, values: List[int]) -> None:
        self.params = {
            "slave_id": slave_id,
            "func_code": func_code,
            "start_addr": start_addr,
            "count": count,
            "values": values
        }
        self.operation_type = "write"
    
    def run(self) -> None:
        self.running = True
        self.tx_count = 0
        self.err_count = 0
        
        if self.operation_type == "read":
            self._run_read_loop()
        else:
            self._run_write_operation()
    
    def _run_read_loop(self) -> None:
        while self.running and self.serial.is_open:
            try:
                result = self._perform_read()
                if result is not None:
                    self.poll_result.emit(result, self.tx_count, self.err_count)
            except Exception as e:
                self.err_count += 1
                self.log_update.emit("Modbus-错误", f"读取失败:{str(e)}")
                self.poll_result.emit([], self.tx_count, self.err_count)
            
            for _ in range(self.params["interval"] // 10):
                if not self.running:
                    break
                self.msleep(10)
    
    def _run_write_operation(self) -> None:
        try:
            success, msg = self._perform_write()
            self.write_result.emit(success, msg)
        except Exception as e:
            self.write_result.emit(False, f"写入失败:{str(e)}")
    
    def _perform_read(self) -> Optional[List[int | bool]]:
        try:
            slave_id = self.params["slave_id"]
            func_code = self.params["func_code"]
            start_addr = self.params["start_addr"]
            count = self.params["count"]
            
            addr_bytes = start_addr.to_bytes(2, byteorder='big')
            count_bytes = count.to_bytes(2, byteorder='big')
            request = bytes([slave_id, func_code]) + addr_bytes + count_bytes
            request = _add_checksum(request, 'CRC-16')
            
            self.serial.write(request)
            self.tx_count += 1
            self.log_update.emit("Modbus-发送", f"请求: {request.hex(' ')}")
            
            self.msleep(100)
            if self.serial.in_waiting == 0:
                raise Exception("无响应")
            
            response = self.serial.read(self.serial.in_waiting)
            self.log_update.emit("Modbus-接收", f"响应: {response.hex(' ')}")
            
            if len(response) < 3:
                raise Exception("响应格式错误")
                
            if response[0] != slave_id:
                raise Exception(f"从站ID不匹配 (预期: {slave_id}, 实际: {response[0]})")
                
            if response[1] & 0x80:
                raise Exception(f"从站错误: 异常码 {response[2]}")
                
            if response[1] != func_code:
                raise Exception(f"功能码不匹配 (预期: {func_code}, 实际: {response[1]})")
            
            byte_count = response[2]
            if len(response) != byte_count + 3:
                raise Exception("响应长度错误")
                
            data_part = response[:-2]
            crc_received = response[-2:]
            crc_calculated = modbus_crc16(data_part)
            if crc_received != crc_calculated:
                raise Exception(f"CRC校验失败 (预期: {crc_calculated.hex()}, 实际: {crc_received.hex()})")
            
            data = []
            if func_code in [1, 2]:
                for i in range(count):
                    byte_idx = i // 8
                    bit_idx = i % 8
                    if byte_idx < byte_count:
                        data.append(bool((response[3 + byte_idx] >> bit_idx) & 0x01))
            elif func_code in [3, 4]:
                for i in range(count):
                    if 3 + i * 2 + 1 < len(response) - 2:
                        val = (response[3 + i * 2] << 8) | response[3 + i * 2 + 1]
                        data.append(val)
            
            return data[:count]
        except Exception as e:
            self.log_update.emit("Modbus-错误", str(e))
            return None
    
    def _perform_write(self) -> tuple[bool, str]:
        try:
            slave_id = self.params["slave_id"]
            func_code = self.params["func_code"]
            start_addr = self.params["start_addr"]
            count = self.params["count"]
            values = self.params["values"]
            
            request = bytes([slave_id, func_code])
            request += start_addr.to_bytes(2, byteorder='big')
            
            if func_code == 5:
                value = 0xFF00 if values[0] else 0x0000
                request += value.to_bytes(2, byteorder='big')
            elif func_code == 6:
                request += values[0].to_bytes(2, byteorder='big')
            elif func_code == 15:
                request += count.to_bytes(2, byteorder='big')
                byte_count = (count + 7) // 8
                request += bytes([byte_count])
                coil_bytes = bytearray(byte_count)
                for i, val in enumerate(values):
                    byte_idx = i // 8
                    bit_idx = i % 8
                    if val:
                        coil_bytes[byte_idx] |= (1 << bit_idx)
                request += coil_bytes
            elif func_code == 16:
                request += count.to_bytes(2, byteorder='big')
                request += bytes([count * 2])
                for val in values:
                    request += val.to_bytes(2, byteorder='big')
            
            request = _add_checksum(request, 'CRC-16')
            
            self.serial.write(request)
            self.tx_count += 1
            self.log_update.emit("Modbus-发送", f"请求: {request.hex(' ')}")
            
            self.msleep(100)
            if self.serial.in_waiting == 0:
                raise Exception("无响应")
            
            response = self.serial.read(self.serial.in_waiting)
            self.log_update.emit("Modbus-接收", f"响应: {response.hex(' ')}")
            
            if len(response) < 5:
                raise Exception("响应格式错误")
                
            if response[0] != slave_id:
                raise Exception(f"从站ID不匹配 (预期: {slave_id}, 实际: {response[0]})")
                
            if response[1] & 0x80:
                raise Exception(f"从站错误: 异常码 {response[2]}")
                
            if response[1] != func_code:
                raise Exception(f"功能码不匹配 (预期: {func_code}, 实际: {response[1]})")
            
            data_part = response[:-2]
            crc_received = response[-2:]
            crc_calculated = modbus_crc16(data_part)
            if crc_received != crc_calculated:
                raise Exception(f"CRC校验失败 (预期: {crc_calculated.hex()}, 实际: {crc_received.hex()})")
            
            return True, f"写入成功: 从站{slave_id}, 功能码{func_code}, 地址{start_addr}, 数量{count}"
        except Exception as e:
            return False, str(e)
    
    def stop(self) -> None:
        self.running = False

# 新增:数据记录线程
class DataRecordThread(QThread):
    record_signal = pyqtSignal(bool, str)
    
    def __init__(self, file_path: str, format_type: str, interval: int):
        super().__init__()
        self.file_path = file_path
        self.format_type = format_type
        self.interval = interval
        self.running = False
        self.data_buffer = None
        
    def set_data(self, conn_type: str, conn_info: str, data: str, is_send: bool) -> None:
        """设置要记录的数据"""
        self.data_buffer = (conn_type, conn_info, data, is_send)
        
    def run(self) -> None:
        self.running = True
        while self.running:
            try:
                if self.data_buffer:
                    conn_type, conn_info, data, is_send = self.data_buffer
                    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
                    success = write_record_to_file(
                        self.file_path, 
                        self.format_type, 
                        timestamp,
                        conn_type,
                        conn_info,
                        data,
                        is_send
                    )
                    if not success:
                        self.record_signal.emit(False, "数据记录失败")
                
                # 等待指定间隔
                for _ in range(self.interval // 10):
                    if not self.running:
                        break
                    self.msleep(10)
            except Exception as e:
                self.record_signal.emit(False, f"记录线程错误: {str(e)}")
                self.running = False
    
    def stop(self) -> None:
        self.running = False

# 新增:蓝牙扫描线程
class BluetoothScanThread(QThread):
    device_found = pyqtSignal(str, str)  # 设备名称, 设备地址
    scan_finished = pyqtSignal()
    scan_error = pyqtSignal(str)
    
    def __init__(self):
        super().__init__()
        self.running = False
        self.found_devices = set()
    
    def run(self):
        self.running = True
        self.found_devices.clear()
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        try:
            scanner = BleakScanner()
            scanner.register_detection_callback(self._detection_callback)
            loop.run_until_complete(scanner.start())
            loop.run_until_complete(asyncio.sleep(5))  # 扫描5秒
            loop.run_until_complete(scanner.stop())
        except Exception as e:
            self.scan_error.emit(str(e))
        finally:
            loop.close()
            self.scan_finished.emit()
    
    def _detection_callback(self, device, advertisement_data):
        if self.running and device.address not in self.found_devices:
            self.found_devices.add(device.address)
            device_name = device.name if device.name else "未知设备"
            self.device_found.emit(device_name, device.address)
    
    def stop(self):
        self.running = False

# 新增:蓝牙接收线程
class BluetoothReceiveThread(QThread):
    data_received = pyqtSignal(bytes)
    error_occurred = pyqtSignal(str)
    
    def __init__(self, client: BleakClient, char_uuid: str):
        super().__init__()
        self.client = client
        self.char_uuid = char_uuid
        self.running = False
        self.loop = asyncio.new_event_loop()
    
    def run(self):
        self.running = True
        asyncio.set_event_loop(self.loop)
        try:
            while self.running and self.client.is_connected:
                try:
                    data = self.loop.run_until_complete(self.client.read_gatt_char(self.char_uuid))
                    if data:
                        self.data_received.emit(data)
                except Exception as e:
                    self.error_occurred.emit(f"接收错误: {str(e)}")
                self.msleep(10)
        except Exception as e:
            self.error_occurred.emit(f"线程错误: {str(e)}")
        finally:
            self.loop.close()
    
    def stop(self):
        self.running = False
        self.loop.stop()

# 新增:网络接收线程(TCP客户端)
class TcpClientReceiveThread(QThread):
    data_received = pyqtSignal(bytes)
    connection_closed = pyqtSignal()
    error_occurred = pyqtSignal(str)
    
    def __init__(self, sock: socket.socket):
        super().__init__()
        self.sock = sock
        self.running = False
    
    def run(self):
        self.running = True
        self.sock.settimeout(1.0)
        while self.running:
            try:
                data = self.sock.recv(1024)
                if not data:
                    break
                self.data_received.emit(data)
            except socket.timeout:
                continue
            except Exception as e:
                self.error_occurred.emit(f"接收错误: {str(e)}")
                break
        self.connection_closed.emit()
    
    def stop(self):
        self.running = False

# 新增:TCP服务器线程
class TcpServerThread(QThread):
    client_connected = pyqtSignal(socket.socket, Tuple[str, int])
    connection_error = pyqtSignal(str)
    
    def __init__(self, host: str, port: int):
        super().__init__()
        self.host = host
        self.port = port
        self.running = False
        self.server_sock = None
    
    def run(self):
        self.running = True
        try:
            self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.server_sock.bind((self.host, self.port))
            self.server_sock.listen(1)
            self.server_sock.settimeout(1.0)
            
            while self.running:
                try:
                    client_sock, addr = self.server_sock.accept()
                    if self.running:
                        self.client_connected.emit(client_sock, addr)
                    else:
                        client_sock.close()
                except socket.timeout:
                    continue
                except Exception as e:
                    self.connection_error.emit(f"服务器错误: {str(e)}")
                    break
        except Exception as e:
            self.connection_error.emit(f"绑定端口失败: {str(e)}")
        finally:
            if self.server_sock:
                self.server_sock.close()
    
    def stop(self):
        self.running = False
        if self.server_sock:
            self.server_sock.close()

# 新增:UDP接收线程
class UdpReceiveThread(QThread):
    data_received = pyqtSignal(bytes, Tuple[str, int])
    error_occurred = pyqtSignal(str)
    
    def __init__(self, sock: socket.socket):
        super().__init__()
        self.sock = sock
        self.running = False
    
    def run(self):
        self.running = True
        self.sock.settimeout(1.0)
        while self.running:
            try:
                data, addr = self.sock.recvfrom(1024)
                if data:
                    self.data_received.emit(data, addr)
            except socket.timeout:
                continue
            except Exception as e:
                self.error_occurred.emit(f"接收错误: {str(e)}")
                break
    
    def stop(self):
        self.running = False

# ===================== 主窗口(终极显示优化) =====================
class ModbusRTUMainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("多功能串口调试工具(增强版)")
        self.setGeometry(100, 100, 1600, 1200)  # 适当增加窗口高度
        self.setMinimumSize(1300, 1000)
        
        self._setup_global_style()
        
        # 初始化串口相关
        self.serial: serial.Serial = serial.Serial()
        self.receive_thread: Optional[SerialReceiveThread] = None
        self.modbus_thread: Optional[ModbusThread] = None
        
        # 初始化蓝牙相关
        self.bluetooth_client: Optional[BleakClient] = None
        self.bluetooth_scan_thread: Optional[BluetoothScanThread] = None
        self.bluetooth_receive_thread: Optional[BluetoothReceiveThread] = None
        self.bluetooth_devices = {}  # {address: name}
        self.is_bluetooth_connected = False
        self.bluetooth_write_char = BLUETOOTH_CHARACTERISTIC_UUID
        
        # 初始化网络相关
        self.network_socket: Optional[socket.socket] = None
        self.tcp_server_thread: Optional[TcpServerThread] = None
        self.tcp_client_receive_thread: Optional[TcpClientReceiveThread] = None
        self.udp_receive_thread: Optional[UdpReceiveThread] = None
        self.network_protocol = "TCP客户端"
        self.client_socket: Optional[socket.socket] = None
        self.client_addr: Optional[Tuple[str, int]] = None
        self.is_network_connected = False
        
        # 初始化数据记录相关
        self.record_thread: Optional[DataRecordThread] = None
        self.is_recording = False
        self.record_file_path = ""
        self.record_format = "CSV"
        self.record_interval = 1000
        
        # 其他初始化
        self.loop_send_timer = QTimer()
        self.loop_send_timer.timeout.connect(self.send_data)
        self.port_scan_timer = QTimer()
        self.port_scan_timer.timeout.connect(self.scan_serial_ports)
        self.port_scan_timer.start(2000)
        
        self.is_serial_open: bool = False
        self.is_loop_sending: bool = False
        self.display_mode: str = "dec"
        
        # 字节计数(分类型)
        self.serial_rx_bytes = 0
        self.serial_tx_bytes = 0
        self.bluetooth_rx_bytes = 0
        self.bluetooth_tx_bytes = 0
        self.network_rx_bytes = 0
        self.network_tx_bytes = 0
        
        self._init_ui()
        self.activateWindow()
        self.raise_()
        self.show()
    
    def _setup_global_style(self):
        # 全局调色板
        palette = QPalette()
        palette.setColor(QPalette.Window, QColor(240, 242, 245))
        palette.setColor(QPalette.WindowText, QColor(40, 44, 52))
        palette.setColor(QPalette.Base, QColor(250, 251, 252))
        palette.setColor(QPalette.AlternateBase, QColor(235, 237, 240))
        palette.setColor(QPalette.Button, QColor(230, 233, 236))
        palette.setColor(QPalette.ButtonText, QColor(40, 44, 52))
        palette.setColor(QPalette.Highlight, QColor(76, 145, 226))
        palette.setColor(QPalette.HighlightedText, QColor(255, 255, 255))
        self.setPalette(palette)
        
        # 样式表(彻底解决文字截断)
        self.setStyleSheet("""
            QMainWindow {background: #F0F2F5;}
            QGroupBox {
                font-weight: bold; 
                font-size: 14px;
                border: 1px solid #CED4DA; 
                border-radius: 8px; 
                margin-top: 30px;  /* 极大增加标题区域的margin,避免内容遮挡标题 */
                padding: 30px 20px 20px 20px;  /* 增大内边距,让内容远离边框 */
                background: #FFFFFF;
            }
            QGroupBox::title {
                subcontrol-origin: margin;
                subcontrol-position: top left;
                left: 30px;  /* 标题左移,避免贴边 */
                top: -18px;  /* 标题上移,完全脱离内容区域 */
                padding: 0 20px;  /* 标题左右留足空间,避免文字挤压 */
                background: #F0F2F5;
                font-family: SimHei;
                font-weight: bold;
                font-size: 14px;
                min-width: 200px;  /* 标题强制最小宽度,确保完整显示 */
            }
            QPushButton {
                background: #E8ECEF; 
                border: 1px solid #CED4DA; 
                border-radius: 6px; 
                padding: 10px 20px;
                font-size: 12px;
                min-height: 36px;
                min-width: 130px;  /* 按钮强制最小宽度,文字不换行 */
                font-family: SimHei;
                font-weight: bold;
            }
            QPushButton:hover {
                background: #D8DFE6;
                border-color: #ADB5BD;
            }
            QPushButton:disabled {
                background: #F5F7F9; 
                color: #ADB5BD;
                border-color: #E2E6EA;
            }
            QPushButton#primaryBtn {
                background: #4A90E2;
                color: white;
                border: none;
                min-height: 38px;
            }
            QPushButton#primaryBtn:hover {
                background: #357ABD;
            }
            QLabel {
                font-size: 12px; 
                color: #495057;
                min-height: 34px;
                min-width: 100px;  /* 标签强制最小宽度,中文不截断 */
                font-family: SimHei;
                font-weight: bold;
            }
            QTableWidget {
                gridline-color: #E9ECEF; 
                font-family: Consolas; 
                font-size: 11px;
                border: 1px solid #E9ECEF;
                border-radius: 6px;
                min-height: 250px;
                alternate-background-color: #F8F9FA;
                font-weight: bold;
            }
            QTableWidget::item:selected {
                background: #E1F0FF;
                color: #212529;
            }
            QTableWidget QHeaderView::section {
                background: #E9ECEF;
                border: 1px solid #CED4DA;
                padding: 10px;
                font-weight: bold;
                font-size: 12px;
                color: #495057;
                font-family: SimHei;
                min-width: 80px;  /* 表头强制最小宽度 */
            }
            QStatusBar {
                font-size: 11px; 
                color: #495057; 
                background: #E9ECEF;
                border-top: 1px solid #CED4DA;
                padding: 2px 15px;
                font-family: SimHei;
                font-weight: bold;
            }
            QLineEdit, QComboBox, QTextEdit {
                border: 1px solid #CED4DA;
                border-radius: 6px;
                padding: 8px 12px;
                font-size: 12px;
                min-height: 36px;
                min-width: 120px;  /* 输入控件强制最小宽度,文字不截断 */
                background: #FFFFFF;
                font-family: SimHei;
                font-weight: bold;
            }
            QLineEdit:focus, QComboBox:focus, QTextEdit:focus {
                border: 1px solid #4A90E2;
                outline: none;
                background: #FFFFFF;
            }
            QComboBox::drop-down {
                border-left: 1px solid #CED4DA;
                border-radius: 0 6px 6px 0;
            }
            QComboBox QAbstractItemView {
                font-family: SimHei;
                font-size: 12px;
                font-weight: bold;
                padding: 5px;
                min-width: 200px;  /* 下拉选项强制最小宽度 */
            }
            QCheckBox {
                font-size: 12px;
                color: #495057;
                min-height: 36px;
                min-width: 120px;  /* 复选框强制最小宽度,文字不换行 */
                spacing: 10px;
                font-family: SimHei;
                font-weight: bold;
            }
            QTabWidget::pane {
                border: 1px solid #CED4DA;
                border-radius: 6px;
                background: #FFFFFF;
                padding: 10px;
            }
            QTabBar::tab {
                background: #E9ECEF;
                border: 1px solid #CED4DA;
                border-bottom-color: #CED4DA;
                border-radius: 6px 6px 0 0;
                padding: 8px 20px;
                font-family: SimHei;
                font-weight: bold;
                margin-right: 2px;
            }
            QTabBar::tab:selected {
                background: #FFFFFF;
                border-color: #CED4DA;
                border-bottom-color: #FFFFFF;
            }
        """)
    
    def _init_ui(self) -> None:
        central_widget = QWidget()
        self.setCentralWidget(central_widget)
        main_layout = QVBoxLayout(central_widget)
        main_layout.setSpacing(25)  # 超大布局间距,彻底避免控件挤压
        main_layout.setContentsMargins(25, 25, 25, 25)
        main_layout.setStretchFactor(central_widget, 1)
        
        # 新增:使用TabWidget组织所有功能区域
        main_tab = QTabWidget()
        main_tab.setFont(FONT_CONFIG["subtitle"])
        
        # 原有功能标签页
        original_tab = QWidget()
        original_layout = QVBoxLayout(original_tab)
        original_layout.setSpacing(25)
        original_layout.setContentsMargins(0, 0, 0, 0)
        
        original_layout.addWidget(self._build_serial_config_widget())
        original_layout.addWidget(self._build_modbus_config_widget())
        original_layout.addWidget(self._build_io_widget("serial"), stretch=1)
        
        main_tab.addTab(original_tab, "普通串口")
        
        # 新增:蓝牙串口标签页
        bluetooth_tab = QWidget()
        bluetooth_layout = QVBoxLayout(bluetooth_tab)
        bluetooth_layout.setSpacing(25)
        bluetooth_layout.setContentsMargins(0, 0, 0, 0)
        
        bluetooth_layout.addWidget(self._build_bluetooth_config_widget())
        bluetooth_layout.addWidget(self._build_io_widget("bluetooth"), stretch=1)
        
        main_tab.addTab(bluetooth_tab, "蓝牙串口")
        
        # 新增:网络串口标签页
        network_tab = QWidget()
        network_layout = QVBoxLayout(network_tab)
        network_layout.setSpacing(25)
        network_layout.setContentsMargins(0, 0, 0, 0)
        
        network_layout.addWidget(self._build_network_config_widget())
        network_layout.addWidget(self._build_io_widget("network"), stretch=1)
        
        main_tab.addTab(network_tab, "网络串口")
        
        # 原有:数据记录标签页
        record_tab = QWidget()
        record_layout = QVBoxLayout(record_tab)
        record_layout.setSpacing(25)
        record_layout.setContentsMargins(0, 0, 0, 0)
        record_layout.addWidget(self._build_data_record_widget())
        main_tab.addTab(record_tab, "数据记录")
        
        # 原有:配置管理标签页
        config_tab = QWidget()
        config_layout = QVBoxLayout(config_tab)
        config_layout.setSpacing(25)
        config_layout.setContentsMargins(0, 0, 0, 0)
        config_layout.addWidget(self._build_config_management_widget())
        main_tab.addTab(config_tab, "配置管理")
        
        main_layout.addWidget(main_tab)
        
        # 状态栏
        self.status_bar = QStatusBar()
        self.setStatusBar(self.status_bar)
        self.status_bar.setFont(FONT_CONFIG["status"])
        self.update_status_bar("serial")
        
        # 连接标签页切换信号
        main_tab.currentChanged.connect(self.on_tab_changed)
    
    # ===================== 串口配置组 =====================
    def _build_serial_config_widget(self) -> QGroupBox:
        group = QGroupBox("串口配置 | Serial Configuration")
        group.setFont(FONT_CONFIG["title"])
        layout = QGridLayout(group)
        layout.setHorizontalSpacing(25)  # 超大水平间距
        layout.setVerticalSpacing(20)    # 超大垂直间距
        layout.setColumnStretch(1, 3)    # 给串口下拉框分配更多宽度
        layout.setColumnStretch(3, 2)
        
        # 标签+控件(强制足够宽度)
        layout.addWidget(QLabel("串口 Port:"), 0, 0, Qt.AlignRight | Qt.AlignVCenter)
        self.serial_combo = QComboBox()
        self.serial_combo.setMinimumWidth(300)  # 串口下拉框超宽,避免文字截断
        self.serial_combo.setFont(FONT_CONFIG["mono"])
        self.serial_combo.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
        layout.addWidget(self.serial_combo, 0, 1)
        
        layout.addWidget(QLabel("波特率 Baud:"), 0, 2, Qt.AlignRight | Qt.AlignVCenter)
        self.baudrate_combo = QComboBox()
        self.baudrate_combo.addItems(["9600", "19200", "38400", "57600", "115200", "230400", "460800", "2000000"])
        self.baudrate_combo.setCurrentText("9600")
        self.baudrate_combo.setMinimumWidth(180)  # 波特率下拉框加宽
        layout.addWidget(self.baudrate_combo, 0, 3)
        
        layout.addWidget(QLabel("数据位 Data:"), 1, 0, Qt.AlignRight | Qt.AlignVCenter)
        self.databits_combo = QComboBox()
        self.databits_combo.addItems(["5", "6", "7", "8"])
        self.databits_combo.setCurrentText("8")
        self.databits_combo.setMinimumWidth(180)
        layout.addWidget(self.databits_combo, 1, 1)
        
        layout.addWidget(QLabel("停止位 Stop:"), 1, 2, Qt.AlignRight | Qt.AlignVCenter)
        self.stopbits_combo = QComboBox()
        self.stopbits_combo.addItems(["1", "1.5", "2"])
        self.stopbits_combo.setCurrentText("1")
        self.stopbits_combo.setMinimumWidth(180)
        layout.addWidget(self.stopbits_combo, 1, 3)
        
        layout.addWidget(QLabel("校验位 Parity:"), 2, 0, Qt.AlignRight | Qt.AlignVCenter)
        self.parity_combo = QComboBox()
        self.parity_combo.addItems(["无 None", "奇校验 Odd", "偶校验 Even"])
        self.parity_combo.setCurrentText("无 None")
        self.parity_combo.setMinimumWidth(180)
        layout.addWidget(self.parity_combo, 2, 1)
        
        layout.addWidget(QLabel("流控 Flow:"), 2, 2, Qt.AlignRight | Qt.AlignVCenter)
        self.rts_cts_check = QCheckBox("RTS/CTS")
        self.rts_cts_check.setMinimumHeight(36)
        self.rts_cts_check.setMinimumWidth(120)
        layout.addWidget(self.rts_cts_check, 2, 3)
        
        # 打开串口按钮(超宽)
        self.serial_open_close_btn = QPushButton("打开串口 Open")
        self.serial_open_close_btn.setObjectName("primaryBtn")
        self.serial_open_close_btn.setMinimumWidth(200)
        self.serial_open_close_btn.setMinimumHeight(38)
        self.serial_open_close_btn.clicked.connect(lambda: self.toggle_connection("serial"))
        layout.addWidget(self.serial_open_close_btn, 0, 4, 3, 1, Qt.AlignCenter)
        
        return group
    
    # 新增:蓝牙配置组
    def _build_bluetooth_config_widget(self) -> QGroupBox:
        group = QGroupBox("蓝牙设置 | Bluetooth Configuration")
        group.setFont(FONT_CONFIG["title"])
        layout = QGridLayout(group)
        layout.setHorizontalSpacing(25)
        layout.setVerticalSpacing(20)
        
        # 蓝牙设备选择
        layout.addWidget(QLabel("设备 Device:"), 0, 0, Qt.AlignRight | Qt.AlignVCenter)
        self.bluetooth_device_combo = QComboBox()
        self.bluetooth_device_combo.setMinimumWidth(300)
        self.bluetooth_device_combo.setFont(FONT_CONFIG["mono"])
        layout.addWidget(self.bluetooth_device_combo, 0, 1)
        
        # 扫描按钮
        self.bluetooth_scan_btn = QPushButton("扫描设备 Scan")
        self.bluetooth_scan_btn.setMinimumWidth(150)
        self.bluetooth_scan_btn.setMinimumHeight(38)
        self.bluetooth_scan_btn.clicked.connect(self.scan_bluetooth_devices)
        layout.addWidget(self.bluetooth_scan_btn, 0, 2)
        
        # 连接按钮
        self.bluetooth_conn_btn = QPushButton("连接 Connect")
        self.bluetooth_conn_btn.setObjectName("primaryBtn")
        self.bluetooth_conn_btn.setMinimumWidth(200)
        self.bluetooth_conn_btn.setMinimumHeight(38)
        self.bluetooth_conn_btn.clicked.connect(lambda: self.toggle_connection("bluetooth"))
        layout.addWidget(self.bluetooth_conn_btn, 1, 0, 1, 3, Qt.AlignCenter)
        
        # 连接状态
        self.bluetooth_status_label = QLabel("未连接 | Disconnected")
        self.bluetooth_status_label.setStyleSheet("color: #E74C3C; font-weight: bold; font-size: 14px;")
        layout.addWidget(self.bluetooth_status_label, 2, 0, 1, 3, Qt.AlignCenter)
        
        return group
    
    # 新增:网络配置组
    def _build_network_config_widget(self) -> QGroupBox:
        group = QGroupBox("网络设置 | Network Configuration")
        group.setFont(FONT_CONFIG["title"])
        layout = QGridLayout(group)
        layout.setHorizontalSpacing(25)
        layout.setVerticalSpacing(20)
        
        # 协议选择
        layout.addWidget(QLabel("协议 Protocol:"), 0, 0, Qt.AlignRight | Qt.AlignVCenter)
        self.network_protocol_combo = QComboBox()
        self.network_protocol_combo.addItems(NETWORK_PROTOCOLS)
        self.network_protocol_combo.setCurrentText("TCP客户端")
        self.network_protocol_combo.setMinimumWidth(180)
        self.network_protocol_combo.currentTextChanged.connect(self.on_network_protocol_changed)
        layout.addWidget(self.network_protocol_combo, 0, 1)
        
        # IP地址
        layout.addWidget(QLabel("IP地址 IP:"), 0, 2, Qt.AlignRight | Qt.AlignVCenter)
        self.network_ip_edit = QLineEdit("127.0.0.1")
        self.network_ip_edit.setMinimumWidth(180)
        layout.addWidget(self.network_ip_edit, 0, 3)
        
        # 端口
        layout.addWidget(QLabel("端口 Port:"), 1, 0, Qt.AlignRight | Qt.AlignVCenter)
        self.network_port_spin = QSpinBox()
        self.network_port_spin.setRange(1, 65535)
        self.network_port_spin.setValue(8080)
        self.network_port_spin.setMinimumWidth(180)
        layout.addWidget(self.network_port_spin, 1, 1)
        
        # 连接按钮
        self.network_conn_btn = QPushButton("连接 Connect")
        self.network_conn_btn.setObjectName("primaryBtn")
        self.network_conn_btn.setMinimumWidth(200)
        self.network_conn_btn.setMinimumHeight(38)
        self.network_conn_btn.clicked.connect(lambda: self.toggle_connection("network"))
        layout.addWidget(self.network_conn_btn, 1, 2, 1, 2, Qt.AlignCenter)
        
        # 连接状态
        self.network_status_label = QLabel("未连接 | Disconnected")
        self.network_status_label.setStyleSheet("color: #E74C3C; font-weight: bold; font-size: 14px;")
        layout.addWidget(self.network_status_label, 2, 0, 1, 4, Qt.AlignCenter)
        
        return group
    
    # ===================== Modbus配置组 =====================
    def _build_modbus_config_widget(self) -> QGroupBox:
        group = QGroupBox("Modbus 配置 | RTU Master(01/02/03/04/05/06/15/16)")
        group.setFont(FONT_CONFIG["title"])
        layout = QGridLayout(group)
        layout.setHorizontalSpacing(25)
        layout.setVerticalSpacing(20)
        for col in range(8):
            layout.setColumnStretch(col, 1)
        
        self.modbus_status_label = QLabel("Disconnected")
        self.modbus_status_label.setStyleSheet("color: #E74C3C; font-weight: bold; font-size: 14px;")
        layout.addWidget(self.modbus_status_label, 0, 0, 1, 2)
        
        status_widget = QWidget()
        status_layout = QHBoxLayout(status_widget)
        status_layout.setSpacing(30)  # 状态信息间距加大
        status_layout.setContentsMargins(0,0,0,0)
        
        self.tx_label = QLabel("Tx = 0")
        self.tx_label.setStyleSheet("color: #2980B9; font-weight: bold; font-size: 12px;")
        self.err_label = QLabel("Err = 0")
        self.err_label.setStyleSheet("color: #E74C3C; font-weight: bold; font-size: 12px;")
        self.id_label = QLabel("ID = 1")
        self.f_label = QLabel("F = 03")
        self.sr_label = QLabel("SR = 1000ms")
        
        for lbl in [self.tx_label, self.err_label, self.id_label, self.f_label, self.sr_label]:
            lbl.setFont(FONT_CONFIG["subtitle"])
            lbl.setMinimumHeight(36)
            lbl.setMinimumWidth(80)  # 状态标签强制宽度
            status_layout.addWidget(lbl)
        status_layout.addStretch()
        
        layout.addWidget(status_widget, 0, 2, 1, 8)
        
        # 输入项(全量加宽)
        layout.addWidget(QLabel("从站地址 ID:"), 1, 0, Qt.AlignRight | Qt.AlignVCenter)
        self.slave_id_edit = QLineEdit("1")
        self.slave_id_edit.setMinimumWidth(120)
        self.slave_id_edit.setMinimumHeight(36)
        self.slave_id_edit.setFont(FONT_CONFIG["mono"])
        layout.addWidget(self.slave_id_edit, 1, 1)
        
        layout.addWidget(QLabel("功能码 Func:"), 1, 2, Qt.AlignRight | Qt.AlignVCenter)
        self.func_code_combo = QComboBox()
        self.func_code_combo.setMinimumWidth(250)  # 功能码下拉框超宽,容纳长文本
        self.func_code_combo.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
        for code, (name, _, desc, _) in FUNC_CODE_MAP.items():
            self.func_code_combo.addItem(f"{code:02d} {name} ({desc})", code)
        self.func_code_combo.currentIndexChanged.connect(self._on_func_code_changed)
        layout.addWidget(self.func_code_combo, 1, 3)
        
        layout.addWidget(QLabel("起始地址 Addr:"), 1, 4, Qt.AlignRight | Qt.AlignVCenter)
        self.start_addr_edit = QLineEdit("0")
        self.start_addr_edit.setMinimumWidth(120)
        self.start_addr_edit.setMinimumHeight(36)
        self.start_addr_edit.setFont(FONT_CONFIG["mono"])
        layout.addWidget(self.start_addr_edit, 1, 5)
        
        layout.addWidget(QLabel("数量 Qty:"), 1, 6, Qt.AlignRight | Qt.AlignVCenter)
        self.count_edit = QLineEdit("10")
        self.count_edit.setMinimumWidth(120)
        self.count_edit.setMinimumHeight(36)
        self.count_edit.setFont(FONT_CONFIG["mono"])
        layout.addWidget(self.count_edit, 1, 7)
        
        layout.addWidget(QLabel("扫描周期 MS:"), 2, 0, Qt.AlignRight | Qt.AlignVCenter)
        self.interval_edit = QLineEdit("1000")
        self.interval_edit.setMinimumWidth(140)
        self.interval_edit.setMinimumHeight(36)
        self.interval_edit.setFont(FONT_CONFIG["mono"])
        layout.addWidget(self.interval_edit, 2, 1)
        
        layout.addWidget(QLabel("写操作值 Value:"), 2, 2, Qt.AlignRight | Qt.AlignVCenter)
        self.write_value_edit = QLineEdit()
        self.write_value_edit.setMinimumWidth(300)  # 写操作值输入框超宽
        self.write_value_edit.setMinimumHeight(36)
        self.write_value_edit.setFont(FONT_CONFIG["mono"])
        self.write_value_edit.setPlaceholderText("线圈:0/1/True/False | 寄存器:0-65535 | 多值用逗号分隔")
        self.write_value_edit.setEnabled(False)
        layout.addWidget(self.write_value_edit, 2, 3, 1, 3)
        
        layout.addWidget(QLabel("显示模式:"), 2, 6, Qt.AlignRight | Qt.AlignVCenter)
        self.display_mode_combo = QComboBox()
        self.display_mode_combo.setMinimumWidth(150)
        self.display_mode_combo.addItems(DISPLAY_MODE_MAP.keys())
        self.display_mode_combo.currentTextChanged.connect(self._on_display_mode_changed)
        layout.addWidget(self.display_mode_combo, 2, 7)
        
        # 按钮(全量加宽)
        self.modbus_btn = QPushButton("开始轮询 Start Poll")
        self.modbus_btn.setObjectName("primaryBtn")
        self.modbus_btn.setMinimumWidth(200)
        self.modbus_btn.setMinimumHeight(38)
        self.modbus_btn.clicked.connect(self.toggle_modbus_operation)
        self.modbus_btn.setEnabled(False)
        layout.addWidget(self.modbus_btn, 3, 0, 1, 2)
        
        self.clear_modbus_btn = QPushButton("清空数据 Clear")
        self.clear_modbus_btn.setMinimumWidth(180)
        self.clear_modbus_btn.setMinimumHeight(36)
        self.clear_modbus_btn.clicked.connect(self.clear_modbus_table)
        layout.addWidget(self.clear_modbus_btn, 3, 2, 1, 2)
        
        self.clear_modbus_log_btn = QPushButton("清空日志 Clear Log")
        self.clear_modbus_log_btn.setMinimumWidth(180)
        self.clear_modbus_log_btn.setMinimumHeight(36)
        self.clear_modbus_log_btn.clicked.connect(self.clear_receive)
        layout.addWidget(self.clear_modbus_log_btn, 3, 4, 1, 2)
        
        self.manual_write_btn = QPushButton("执行写操作 Write")
        self.manual_write_btn.setObjectName("primaryBtn")
        self.manual_write_btn.setMinimumWidth(200)
        self.manual_write_btn.setMinimumHeight(38)
        self.manual_write_btn.clicked.connect(self.execute_manual_write)
        self.manual_write_btn.setEnabled(False)
        layout.addWidget(self.manual_write_btn, 3, 6, 1, 2)
        
        # 表格(超宽+高)
        self.modbus_table = QTableWidget()
        self.modbus_table.setColumnCount(2)
        self.modbus_table.setHorizontalHeaderLabels(["Address (Dec)", "Value"])
        self.modbus_table.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeToContents)
        self.modbus_table.horizontalHeader().setSectionResizeMode(1, QHeaderView.Stretch)
        self.modbus_table.setRowCount(10)
        self.modbus_table.setAlternatingRowColors(True)
        self.modbus_table.setMinimumHeight(300)  # 表格加高
        self.modbus_table.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
        self._init_modbus_table()
        layout.addWidget(self.modbus_table, 4, 0, 1, 8)
        
        return group
    
    # ===================== 收发组(支持串口/蓝牙/网络) =====================
    def _build_io_widget(self, conn_type: str) -> QWidget:
        """
        创建数据收发区域
        conn_type: serial/bluetooth/network
        """
        widget = QWidget()
        layout = QHBoxLayout(widget)
        layout.setSpacing(25)
        layout.setStretch(0, 8)  # 接收日志区域占比更大
        layout.setStretch(1, 2)
        
        # 接收日志组
        receive_group = QGroupBox(f"接收日志 | Receive Log ({conn_type.upper()})")
        receive_group.setFont(FONT_CONFIG["title"])
        receive_layout = QVBoxLayout(receive_group)
        receive_group.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
        
        receive_ctrl = QHBoxLayout()
        receive_ctrl.setSpacing(20)  # 控件间距加大
        
        # 动态创建控件并存储到实例变量
        hex_check_name = f"{conn_type}_hex_receive_check"
        timestamp_check_name = f"{conn_type}_timestamp_check"
        auto_wrap_check_name = f"{conn_type}_auto_wrap_check"
        auto_scroll_check_name = f"{conn_type}_auto_scroll_check"
        receive_text_name = f"{conn_type}_receive_text"
        clear_receive_btn_name = f"{conn_type}_clear_receive_btn"
        byte_count_label_name = f"{conn_type}_byte_count_label"
        
        # 创建接收控件
        hex_check = QCheckBox("十六进制显示 HEX")
        timestamp_check = QCheckBox("显示时间戳 Time")
        timestamp_check.setChecked(True)
        auto_wrap_check = QCheckBox("自动换行 Wrap")
        auto_wrap_check.setChecked(True)
        auto_scroll_check = QCheckBox("自动滚屏 Scroll")
        auto_scroll_check.setChecked(True)
        
        # 存储到实例变量
        setattr(self, hex_check_name, hex_check)
        setattr(self, timestamp_check_name, timestamp_check)
        setattr(self, auto_wrap_check_name, auto_wrap_check)
        setattr(self, auto_scroll_check_name, auto_scroll_check)
        
        for cb in [hex_check, timestamp_check, auto_wrap_check, auto_scroll_check]:
            cb.setMinimumHeight(36)
            cb.setMinimumWidth(150)  # 复选框强制宽度
        
        receive_ctrl.addWidget(hex_check)
        receive_ctrl.addWidget(timestamp_check)
        receive_ctrl.addWidget(auto_wrap_check)
        receive_ctrl.addWidget(auto_scroll_check)
        receive_ctrl.addStretch(1)
        
        # 清空接收按钮
        clear_receive_btn = QPushButton("清空接收 Clear")
        clear_receive_btn.setMinimumWidth(150)
        clear_receive_btn.setMinimumHeight(36)
        clear_receive_btn.clicked.connect(lambda: getattr(self, receive_text_name).clear())
        setattr(self, clear_receive_btn_name, clear_receive_btn)
        receive_ctrl.addWidget(clear_receive_btn)
        
        receive_layout.addLayout(receive_ctrl)
        
        # 接收文本框
        receive_text = QTextEdit()
        receive_text.setReadOnly(True)
        receive_text.setFont(FONT_CONFIG["mono"])
        receive_text.setLineWrapMode(QTextEdit.WidgetWidth if auto_wrap_check.isChecked() else QTextEdit.NoWrap)
        receive_text.setMinimumHeight(350)  # 接收框加高
        receive_text.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
        auto_wrap_check.clicked.connect(lambda: receive_text.setLineWrapMode(
            QTextEdit.WidgetWidth if auto_wrap_check.isChecked() else QTextEdit.NoWrap))
        setattr(self, receive_text_name, receive_text)
        receive_layout.addWidget(receive_text)
        
        # 字节计数标签
        byte_count_label = QLabel("接收: 0 字节 | 发送: 0 字节")
        byte_count_label.setAlignment(Qt.AlignRight)
        byte_count_label.setFont(FONT_CONFIG["subtitle"])
        byte_count_label.setMinimumHeight(36)
        byte_count_label.setMinimumWidth(300)  # 统计标签加宽
        setattr(self, byte_count_label_name, byte_count_label)
        receive_layout.addWidget(byte_count_label)
        
        # 发送数据组
        send_group = QGroupBox(f"发送数据 | Transmit Data ({conn_type.upper()})")
        send_group.setFont(FONT_CONFIG["title"])
        send_layout = QVBoxLayout(send_group)
        send_group.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
        
        send_ctrl = QHBoxLayout()
        send_ctrl.setSpacing(15)
        
        # 动态创建发送控件
        hex_send_check_name = f"{conn_type}_hex_send_check"
        send_newline_check_name = f"{conn_type}_send_newline_check"
        auto_send_check_name = f"{conn_type}_auto_send_check"
        send_interval_spin_name = f"{conn_type}_send_interval_spin"
        send_btn_name = f"{conn_type}_send_btn"
        clear_send_btn_name = f"{conn_type}_clear_send_btn"
        send_text_name = f"{conn_type}_send_text"
        
        hex_send_check = QCheckBox("HEX发送")
        hex_send_check.setMinimumHeight(36)
        send_newline_check = QCheckBox("发送新行")
        send_newline_check.setMinimumHeight(36)
        auto_send_check = QCheckBox("自动发送")
        auto_send_check.setMinimumHeight(36)
        
        setattr(self, hex_send_check_name, hex_send_check)
        setattr(self, send_newline_check_name, send_newline_check)
        setattr(self, auto_send_check_name, auto_send_check)
        
        send_ctrl.addWidget(hex_send_check)
        send_ctrl.addWidget(send_newline_check)
        send_ctrl.addWidget(auto_send_check)
        send_ctrl.addWidget(QLabel("间隔(ms):"))
        
        send_interval_spin = QSpinBox()
        send_interval_spin.setRange(10, 60000)
        send_interval_spin.setValue(1000)
        send_interval_spin.setEnabled(False)
        send_interval_spin.setMinimumWidth(100)
        auto_send_check.toggled.connect(lambda checked: send_interval_spin.setEnabled(checked))
        setattr(self, send_interval_spin_name, send_interval_spin)
        send_ctrl.addWidget(send_interval_spin)
        
        send_layout.addLayout(send_ctrl)
        
        # 发送文本框
        send_text = QTextEdit()
        send_text.setFont(FONT_CONFIG["mono"])
        send_text.setLineWrapMode(QTextEdit.WidgetWidth)
        send_text.setMinimumHeight(150)
        setattr(self, send_text_name, send_text)
        send_layout.addWidget(send_text)
        
        # 发送按钮布局
        send_btn_layout = QHBoxLayout()
        send_btn = QPushButton("发送 Send")
        send_btn.setObjectName("primaryBtn")
        send_btn.setMinimumWidth(150)
        send_btn.setMinimumHeight(38)
        send_btn.clicked.connect(lambda: self.send_data(conn_type))
        send_btn.setEnabled(False)
        setattr(self, send_btn_name, send_btn)
        
        clear_send_btn = QPushButton("清空 Clear")
        clear_send_btn.setMinimumWidth(150)
        clear_send_btn.setMinimumHeight(36)
        clear_send_btn.clicked.connect(lambda: getattr(self, send_text_name).clear())
        setattr(self, clear_send_btn_name, clear_send_btn)
        
        send_btn_layout.addWidget(send_btn)
        send_btn_layout.addWidget(clear_send_btn)
        send_layout.addLayout(send_btn_layout)
        
        layout.addWidget(receive_group)
        layout.addWidget(send_group)
        
        return widget
    
    # ===================== 数据记录功能界面 =====================
    def _build_data_record_widget(self) -> QGroupBox:
        group = QGroupBox("数据记录配置 | Data Recording")
        group.setFont(FONT_CONFIG["title"])
        layout = QGridLayout(group)
        layout.setHorizontalSpacing(25)
        layout.setVerticalSpacing(20)
        
        # 记录文件设置
        layout.addWidget(QLabel("记录文件路径:"), 0, 0, Qt.AlignRight | Qt.AlignVCenter)
        self.record_file_edit = QLineEdit()
        self.record_file_edit.setMinimumWidth(400)
        self.record_file_edit.setReadOnly(True)
        layout.addWidget(self.record_file_edit, 0, 1)
        
        self.browse_record_btn = QPushButton("浏览...")
        self.browse_record_btn.setMinimumWidth(100)
        self.browse_record_btn.clicked.connect(self.browse_record_file)
        layout.addWidget(self.browse_record_btn, 0, 2)
        
        # 记录格式设置
        layout.addWidget(QLabel("记录格式:"), 1, 0, Qt.AlignRight | Qt.AlignVCenter)
        self.record_format_combo = QComboBox()
        self.record_format_combo.addItems(RECORD_FORMAT_OPTIONS)
        self.record_format_combo.setCurrentText(self.record_format)
        self.record_format_combo.currentTextChanged.connect(self._on_record_format_changed)
        layout.addWidget(self.record_format_combo, 1, 1)
        
        # 记录间隔设置
        layout.addWidget(QLabel("记录间隔(ms):"), 1, 2, Qt.AlignRight | Qt.AlignVCenter)
        self.record_interval_edit = QLineEdit(str(self.record_interval))
        self.record_interval_edit.setMinimumWidth(150)
        layout.addWidget(self.record_interval_edit, 1, 3)
        
        # 记录状态和控制
        layout.addWidget(QLabel("记录状态:"), 2, 0, Qt.AlignRight | Qt.AlignVCenter)
        self.record_status_label = QLabel(RECORD_STATUS["STOPPED"])
        self.record_status_label.setStyleSheet("color: #E74C3C; font-weight: bold; font-size: 14px;")
        layout.addWidget(self.record_status_label, 2, 1)
        
        self.record_control_btn = QPushButton("开始记录")
        self.record_control_btn.setObjectName("primaryBtn")
        self.record_control













aca.png (128.25 KB, 下载次数: 2)

aca.png

ad.jpg (146.62 KB, 下载次数: 0)

ad.jpg

cd.jpg (77.26 KB, 下载次数: 0)

cd.jpg

ef.jpg (83.71 KB, 下载次数: 0)

ef.jpg

4a657f94-2ca1-49fe-af41-fe9ce19d468e.png (58.34 KB, 下载次数: 1)

4a657f94-2ca1-49fe-af41-fe9ce19d468e.png

免费评分

参与人数 7吾爱币 +12 热心值 +6 收起 理由
loverking + 1 谢谢@Thanks!
hrh123 + 7 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!
helian147 + 1 + 1 热心回复!
ciker_li + 2 + 1 谢谢@Thanks!
WangWhereGo + 1 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!
huanboi + 1 我很赞同!
cxb2468 + 1 用心讨论,共获提升!

查看全部评分

本帖被以下淘专辑推荐:

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

推荐
nmgjinbin 发表于 2026-5-6 15:52
下载收藏先,吃灰中
推荐
 楼主| sty19890218 发表于 2025-12-19 14:56 |楼主
junluo8247 发表于 2025-12-19 14:39
波特率能改200W吗  是否技持?

[Python] 纯文本查看 复制代码
# 原波特率列表
self.baudrate_combo.addItems(["9600", "19200", "38400", "57600", "115200", "230400", "460800"])
# 修改后(添加2000000)
self.baudrate_combo.addItems(["9600", "19200", "38400", "57600", "115200", "230400", "460800", "2000000"])
[Python] 纯文本查看 复制代码
def _build_serial_config_widget(self) -> QGroupBox:
    group = QGroupBox("串口配置 | Serial Configuration")
    group.setFont(FONT_CONFIG["title"])
    layout = QGridLayout(group)
    layout.setHorizontalSpacing(25)  # 超大水平间距
    layout.setVerticalSpacing(20)    # 超大垂直间距
    layout.setColumnStretch(1, 3)    # 给串口下拉框分配更多宽度
    layout.setColumnStretch(3, 2)
    
    # 标签+控件(强制足够宽度)
    layout.addWidget(QLabel("串口 Port:"), 0, 0, Qt.AlignRight | Qt.AlignVCenter)
    self.serial_combo = QComboBox()
    self.serial_combo.setMinimumWidth(300)  # 串口下拉框超宽,避免文字截断
    self.serial_combo.setFont(FONT_CONFIG["mono"])
    self.serial_combo.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
    layout.addWidget(self.serial_combo, 0, 1)
    
    layout.addWidget(QLabel("波特率 Baud:"), 0, 2, Qt.AlignRight | Qt.AlignVCenter)
    self.baudrate_combo = QComboBox()
    # 重点:添加2000000(200W)波特率选项
    self.baudrate_combo.addItems(["9600", "19200", "38400", "57600", "115200", "230400", "460800", "2000000"])
    self.baudrate_combo.setCurrentText("9600")  # 如需默认200W,改为 "2000000"
    self.baudrate_combo.setMinimumWidth(180)  # 波特率下拉框加宽
    layout.addWidget(self.baudrate_combo, 0, 3)
    
    # 其余代码不变...
[Python] 纯文本查看 复制代码
# 替换原baudrate_combo为QLineEdit
self.baudrate_edit = QLineEdit("9600")
self.baudrate_edit.setMinimumWidth(180)
self.baudrate_edit.setFont(FONT_CONFIG["mono"])
layout.addWidget(self.baudrate_edit, 0, 3)

# 打开串口时读取输入值
self.serial.baudrate = int(self.baudrate_edit.text())
3#
lnshijia 发表于 2025-12-19 14:24
4#
ashirogimuto 发表于 2025-12-19 14:29
试一下看好不好用
5#
junluo8247 发表于 2025-12-19 14:39
波特率能改200W吗  是否技持?
6#
jun269 发表于 2025-12-19 14:58
楼主,代码运行时提示:ModuleNotFoundError: No module named 'serial.tools' ,但这个模块怎么安装都不对
7#
繁花似锦丿遇见 发表于 2025-12-19 15:01

试一下看好不好用
8#
307921917 发表于 2025-12-19 15:21
有编译好的exe吗,不出意外复制的源码一调试一堆报错,懒得更新对应的库了
9#
cnnets 发表于 2025-12-19 15:54

老大,部分文字未能适应高分辨率显示器的比例放大,按扭的字太小了,能否设计成自适应?
10#
 楼主| sty19890218 发表于 2025-12-19 16:35 |楼主
jun269 发表于 2025-12-19 14:58
楼主,代码运行时提示:ModuleNotFoundError: No module named 'serial.tools' ,但这个模块怎么安装都不对 ...

错误原因
ModuleNotFoundError: No module named 'serial.tools' 是因为缺少 pyserial 库(serial.tools 是 pyserial 库的子模块,并非 Python 内置模块),且注意不要混淆 serial 库(非目标库)和 pyserial 库(正确库)。
解决步骤
1. 核心:安装 pyserial 库
根据你的 Python 环境,选择对应的安装命令(打开终端 / 命令提示符执行):
环境 / 系统        安装命令
Python 3(通用)        pip3 install pyserial
Python 3(pip 指向 Python3)        pip install pyserial
Windows(默认 Python3)        pip install pyserial
Linux/Mac(权限不足)        pip3 install pyserial --user(推荐)
Linux/Mac(全局安装)        sudo pip3 install pyserial(不推荐)
虚拟环境(需先激活)        pip install pyserial
Anaconda 环境        conda install -c conda-forge pyserial
2. 验证安装是否成功
执行以下命令,查看 pyserial 版本(确认安装路径和版本):
bash
运行
# Python 3
python3 -m pip show pyserial

# Windows/Python3(pip指向Python3)
python -m pip show pyserial
如果输出包含 Name: pyserial 和 Version: xxx(如 3.5、3.6 等),说明安装成功。
3. 排查「安装成功但仍报错」的情况
如果安装后依然报错,大概率是 Python 解释器和 pip 不匹配:
比如用 pip install pyserial 装到了 Python2,而运行代码用的是 Python3;
IDE(PyCharm/Vscode)中需确认「项目解释器」已安装 pyserial(不是全局解释器)。
测试代码
安装完成后,运行以下代码验证是否能正常导入:
python
运行
import serial
from serial.tools import list_ports

# 列出所有可用串口
ports = list_ports.comports()
if ports:
    print("可用串口:")
    for port in ports:
        print(f"- {port.device} ({port.description})")
else:
    print("未检测到可用串口")
常见问题补充
若提示 pip: command not found:需将 Python 的 Scripts 目录加入系统环境变量(Windows),或使用 python3 -m pip install pyserial 替代。
若安装了 serial 库(非 pyserial):先卸载错误库 pip3 uninstall serial,再重装 pyserial。
版本兼容:pyserial 3.5 兼容 Python 2.7/3.4+,Python 3.10+ 建议装 pyserial 3.5+。
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - 52pojie.cn ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2026-5-17 01:03

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表