好友
阅读权限10
听众
最后登录1970-1-1
|
本帖最后由 sty19890218 于 2026-1-9 12:46 编辑
[Python] 纯文本查看 复制代码 #!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Modbus RTU 主站工具(适配版)
功能:支持 01/02/03/04(读)、05/06/15/16(写),串口收发,日志保存,数据可视化
适配:优化窗口尺寸/布局,兼容1080P/768P屏幕,最大化无错乱
"""
import sys
import time
import serial
import serial.tools.list_ports
from typing import List, Tuple, Optional, Dict, Set
from PyQt5.QtWidgets import (
QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QComboBox, QPushButton, QTextEdit, QCheckBox, QLabel, QMessageBox,
QGroupBox, QGridLayout, QStatusBar, QLineEdit, QFileDialog,
QTableWidget, QTableWidgetItem, QHeaderView, QSizePolicy
)
from PyQt5.QtCore import (
QThread, pyqtSignal, Qt, QTimer, QDateTime, QMutex, QMutexLocker,
QThreadPool, QRunnable, pyqtSlot
)
from PyQt5.QtGui import QFont, QColor, QClipboard, QPalette
# ===================== 全局常量/枚举(适配优化) =====================
MODBUS_ERROR_CODES: Dict[int, str] = {
0x01: "非法功能码",
0x02: "非法数据地址",
0x03: "非法数据值",
0x04: "从站设备故障",
0x05: "确认",
0x06: "从站设备忙",
0x07: "否定确认",
0x08: "内存奇偶校验错误"
}
FUNC_CODE_MAP: Dict[int, Tuple[str, str, str, Tuple[int, int]]] = {
1: ("读线圈", "read", "Read Coils", (0, 65535)),
2: ("读离散输入", "read", "Read Discrete Inputs", (0, 65535)),
3: ("读保持寄存器", "read", "Read Holding Registers", (0, 65535)),
4: ("读输入寄存器", "read", "Read Input Registers", (0, 65535)),
5: ("写单线圈", "write", "Write Single Coil", (0, 65535)),
6: ("写单寄存器", "write", "Write Single Register", (0, 65535)),
15: ("写多线圈", "write", "Write Multiple Coils", (0, 65535)),
16: ("写多寄存器", "write", "Write Multiple Registers", (0, 65535))
}
DISPLAY_MODE_MAP: Dict[str, str] = {
"十进制": "dec",
"十六进制": "hex",
"二进制": "bin"
}
DEFAULT_SERIAL_PARAMS = {
"baudrate": 9600,
"databits": 8,
"stopbits": 1.0,
"parity": serial.PARITY_NONE,
"rtscts": False,
"timeout": 0.1
}
# 字体适配:降低字号,适配通用屏幕
FONT_CONFIG = {
"default": QFont("Microsoft YaHei", 8), # 原14 → 12
"mono": QFont("Consolas", 8), # 原14 → 12
"title": QFont("Microsoft YaHei", 8, QFont.Bold), # 原16 → 14
"status": QFont("Microsoft YaHei", 8) # 原14 → 12
}
# ===================== 工具函数 =====================
def modbus_crc16(data: bytes) -> bytes:
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
crc = (crc >> 1) ^ 0xA001 if crc & 0x0001 else crc >> 1
return bytes([crc & 0xFF, (crc >> 8) & 0xFF])
def validate_numeric_input(text: str, min_val: int = 0, max_val: int = 65535, field_name: str = "值") -> Optional[int]:
try:
val = int(text.strip())
if min_val <= val <= max_val:
return val
QMessageBox.warning(None, "输入错误", f"{field_name}需在 {min_val}-{max_val} 之间")
return None
except ValueError:
QMessageBox.warning(None, "输入错误", f"{field_name}请输入有效的整数")
return None
def validate_modbus_addr(func_code: int, addr: int) -> bool:
min_addr, max_addr = FUNC_CODE_MAP[func_code][3]
if not (min_addr <= addr <= max_addr):
QMessageBox.warning(None, "地址错误",
f"{FUNC_CODE_MAP[func_code][0]}的地址范围为 {min_addr}-{max_addr}")
return False
return True
def format_value(val: int | bool, mode: str) -> str:
if isinstance(val, bool):
return "1" if val else "0"
if mode == "hex":
return f"0x{val:04X}"
elif mode == "bin":
return f"0b{val:016b}"
return str(val)
def parse_write_values(text: str, func_code: int, count: int) -> Optional[List[int | bool]]:
try:
values = [v.strip() for v in text.split(",") if v.strip()]
write_values = []
if func_code in [5, 6] and len(values) != 1:
raise Exception(f"{FUNC_CODE_MAP[func_code][0]}仅支持单个值")
if func_code in [15, 16] and len(values) < count:
raise Exception(f"{FUNC_CODE_MAP[func_code][0]}需输入{count}个值(当前{len(values)}个)")
for idx, val in enumerate(values[:count]):
if func_code in [5, 15]:
if val.lower() in ["1", "true", "on"]:
write_values.append(True)
elif val.lower() in ["0", "false", "off"]:
write_values.append(False)
else:
raise Exception(f"线圈值{idx+1}需为 0/1/True/False")
else:
int_val = int(val)
if not (0 <= int_val <= 65535):
raise Exception(f"寄存器值{idx+1}需在 0-65535 之间")
write_values.append(int_val)
return write_values
except Exception as e:
QMessageBox.warning(None, "写值解析失败", str(e))
return None
def scan_valid_ports() -> List[str]:
valid_ports: Set[str] = set()
ports = serial.tools.list_ports.comports()
for port in ports:
if port.device and not any(filter_word in port.description.lower()
for filter_word in ["bluetooth", "蓝牙", "virtual", "虚拟"]):
port_name = f"{port.device} - {port.description}"
valid_ports.add(port_name)
return sorted(list(valid_ports))
# ===================== 串口接收线程 =====================
class SerialReceiveThread(QThread):
receive_signal = pyqtSignal(bytes)
error_signal = pyqtSignal(str)
def __init__(self, serial_obj: serial.Serial):
super().__init__()
self.serial = serial_obj
self._is_running = False
self._mutex = QMutex()
self.setPriority(QThread.LowPriority)
def run(self) -> None:
with QMutexLocker(self._mutex):
self._is_running = True
while True:
with QMutexLocker(self._mutex):
if not self._is_running or not self.serial.is_open:
break
try:
if self.serial.in_waiting > 0:
data = self.serial.read(self.serial.in_waiting)
self.receive_signal.emit(data)
time.sleep(0.005)
except Exception as e:
self.error_signal.emit(f"接收异常:{str(e)}")
break
def stop(self) -> None:
with QMutexLocker(self._mutex):
self._is_running = False
if not self.wait(2000):
self.terminate()
self.wait()
# ===================== Modbus 操作线程 =====================
class ModbusThread(QThread):
poll_result = pyqtSignal(list, int, int)
status_update = pyqtSignal(str)
log_update = pyqtSignal(str, str)
write_result = pyqtSignal(bool, str)
def __init__(self, serial_obj: serial.Serial):
super().__init__()
self.serial = serial_obj
self._is_running = False
self._mutex = QMutex()
self.slave_id: int = 1
self.func_code: int = 3
self.start_addr: int = 0
self.count: int = 10
self.interval: int = 1000
self.write_values: List[int | bool] = []
self.is_write_op: bool = False
self.tx_count: int = 0
self.err_count: int = 0
def set_read_params(self, slave_id: int, func_code: int, start_addr: int, count: int, interval: int) -> None:
with QMutexLocker(self._mutex):
self.slave_id = slave_id
self.func_code = func_code
self.start_addr = start_addr
self.count = min(count, 1000)
self.interval = interval
self.is_write_op = False
self.tx_count = 0
self.err_count = 0
def set_write_params(self, slave_id: int, func_code: int, start_addr: int, count: int, values: List[int | bool]) -> None:
with QMutexLocker(self._mutex):
self.slave_id = slave_id
self.func_code = func_code
self.start_addr = start_addr
self.count = min(count, 1000)
self.write_values = values[:self.count]
self.is_write_op = True
self.tx_count = 0
self.err_count = 0
def run(self) -> None:
with QMutexLocker(self._mutex):
self._is_running = True
try:
if self.is_write_op:
self._execute_write()
else:
self._execute_read_loop()
except Exception as e:
err_msg = f"操作异常:{str(e)}"
self.status_update.emit(err_msg)
self.log_update.emit("Modbus-错误", err_msg)
with QMutexLocker(self._mutex):
self._is_running = False
def _execute_read_loop(self) -> None:
while True:
with QMutexLocker(self._mutex):
if not self._is_running or not self.serial.is_open:
break
self._execute_read_once()
sleep_steps = int(self.interval / 10)
for _ in range(sleep_steps):
time.sleep(0.01)
with QMutexLocker(self._mutex):
if not self._is_running:
break
def _execute_read_once(self) -> None:
try:
req_data = self._build_read_request()
if not req_data:
return
self.serial.write(req_data)
self.tx_count += 1
self.log_update.emit("Modbus-发送",
f"从站{self.slave_id} 功能码{self.func_code:02X} | {req_data.hex(' ')}")
time.sleep(0.01 + (len(req_data) * 10) / self.serial.baudrate)
if self.serial.in_waiting == 0:
raise Exception("从站无响应")
resp_data = self.serial.read(self.serial.in_waiting)
self.log_update.emit("Modbus-接收",
f"从站{self.slave_id} | {resp_data.hex(' ')}")
self._validate_response(resp_data)
data_list = self._parse_response(resp_data)
self.poll_result.emit(data_list, self.tx_count, self.err_count)
self.status_update.emit(f"读成功:{self.count}个{FUNC_CODE_MAP[self.func_code][0]}")
except Exception as e:
self.err_count += 1
err_msg = f"读失败:{str(e)}"
self.status_update.emit(err_msg)
self.log_update.emit("Modbus-错误",
f"{err_msg} | Tx={self.tx_count} Err={self.err_count}")
self.poll_result.emit([], self.tx_count, self.err_count)
def _execute_write(self) -> None:
try:
req_data = self._build_write_request()
if not req_data:
return
self.serial.write(req_data)
self.tx_count += 1
self.log_update.emit("Modbus-发送",
f"从站{self.slave_id} 功能码{self.func_code:02X} | {req_data.hex(' ')}")
time.sleep(0.01 + (len(req_data) * 10) / self.serial.baudrate)
if self.serial.in_waiting == 0:
raise Exception("从站无响应")
resp_data = self.serial.read(self.serial.in_waiting)
self.log_update.emit("Modbus-接收",
f"从站{self.slave_id} | {resp_data.hex(' ')}")
self._validate_write_response(req_data, resp_data)
success_msg = f"写成功:{self.count}个{FUNC_CODE_MAP[self.func_code][0]}"
self.write_result.emit(True, success_msg)
self.status_update.emit(success_msg)
self.poll_result.emit([], self.tx_count, self.err_count)
except Exception as e:
self.err_count += 1
err_msg = f"写失败:{str(e)}"
self.status_update.emit(err_msg)
self.log_update.emit("Modbus-错误",
f"{err_msg} | Tx={self.tx_count} Err={self.err_count}")
self.write_result.emit(False, err_msg)
self.poll_result.emit([], self.tx_count, self.err_count)
def _build_read_request(self) -> Optional[bytearray]:
try:
req = bytearray([self.slave_id, self.func_code])
req.extend(self.start_addr.to_bytes(2, 'big'))
req.extend(self.count.to_bytes(2, 'big'))
req.extend(modbus_crc16(req))
return req
except Exception as e:
err_msg = f"读报文组装失败:{str(e)}"
self.status_update.emit(err_msg)
self.log_update.emit("Modbus-错误", err_msg)
return None
def _build_write_request(self) -> Optional[bytearray]:
try:
req = bytearray([self.slave_id, self.func_code])
if self.func_code == 5:
req.extend(self.start_addr.to_bytes(2, 'big'))
req.extend((0xFF00 if self.write_values[0] else 0x0000).to_bytes(2, 'big'))
elif self.func_code == 6:
req.extend(self.start_addr.to_bytes(2, 'big'))
req.extend(int(self.write_values[0]).to_bytes(2, 'big'))
elif self.func_code == 15:
req.extend(self.start_addr.to_bytes(2, 'big'))
req.extend(self.count.to_bytes(2, 'big'))
byte_count = (self.count + 7) // 8
req.append(byte_count)
coil_bytes = bytearray(byte_count)
for idx, val in enumerate(self.write_values[:self.count]):
if val:
coil_bytes[idx//8] |= 1 << (idx%8)
req.extend(coil_bytes)
elif self.func_code == 16:
req.extend(self.start_addr.to_bytes(2, 'big'))
req.extend(self.count.to_bytes(2, 'big'))
req.append(self.count * 2)
for val in self.write_values[:self.count]:
req.extend(int(val).to_bytes(2, 'big'))
else:
raise Exception(f"不支持的写功能码:{self.func_code}")
req.extend(modbus_crc16(req))
return req
except Exception as e:
err_msg = f"写报文组装失败:{str(e)}"
self.status_update.emit(err_msg)
self.log_update.emit("Modbus-错误", err_msg)
return None
def _validate_response(self, resp_data: bytes) -> None:
if len(resp_data) < 5:
raise Exception("响应数据过短(小于5字节)")
resp_crc = resp_data[-2:]
calc_crc = modbus_crc16(resp_data[:-2])
if resp_crc != calc_crc:
raise Exception(f"CRC校验失败(接收:{resp_crc.hex()} 计算:{calc_crc.hex()})")
if resp_data[1] == self.func_code + 0x80:
err_code = resp_data[2]
raise Exception(f"从站异常:{MODBUS_ERROR_CODES.get(err_code, f'未知错误{err_code:02X}')}")
if resp_data[1] != self.func_code:
raise Exception(f"功能码不匹配(接收:{resp_data[1]} 预期:{self.func_code})")
def _validate_write_response(self, req_data: bytes, resp_data: bytes) -> None:
if len(resp_data) < 5:
raise Exception("响应数据过短(小于5字节)")
resp_crc = resp_data[-2:]
calc_crc = modbus_crc16(resp_data[:-2])
if resp_crc != calc_crc:
raise Exception(f"CRC校验失败(接收:{resp_crc.hex()} 计算:{calc_crc.hex()})")
if self.func_code in [5, 6]:
if resp_data != req_data:
raise Exception("写响应与请求不匹配")
elif self.func_code in [15, 16]:
resp_addr = int.from_bytes(resp_data[2:4], 'big')
resp_count = int.from_bytes(resp_data[4:6], 'big')
if resp_addr != self.start_addr or resp_count != self.count:
raise Exception(f"地址/数量不匹配(地址:{resp_addr}≠{self.start_addr} 数量:{resp_count}≠{self.count})")
def _parse_response(self, resp_data: bytes) -> List[int | bool]:
data_list = []
if self.func_code in [1, 2]:
byte_count = resp_data[2]
for idx in range(self.count):
if idx >= byte_count * 8:
data_list.append(False)
else:
data_list.append((resp_data[3 + idx//8] & (1 << (idx%8))) != 0)
elif self.func_code in [3, 4]:
for i in range(self.count):
if i * 2 + 3 >= len(resp_data) - 2:
data_list.append(0)
else:
data_list.append(int.from_bytes(resp_data[3+i*2 : 5+i*2], 'big'))
return data_list
def stop(self) -> None:
with QMutexLocker(self._mutex):
self._is_running = False
if not self.wait(2000):
self.terminate()
self.wait()
# ===================== 主窗口(布局适配优化) =====================
class ModbusRTUMainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("Modbus RTU 主站工具(适配版)")
# 初始尺寸适配1080P(原1800x1100 → 1200x800)
self.setGeometry(100, 100, 1200, 800)
# 最小尺寸适配768P(原1600x1000 → 800x600)
self.setMinimumSize(800, 600)
self._setup_global_style()
self.serial: serial.Serial = serial.Serial()
self.receive_thread: Optional[SerialReceiveThread] = None
self.modbus_thread: Optional[ModbusThread] = None
self.loop_send_timer = QTimer()
self.loop_send_timer.timeout.connect(self.send_data)
self.port_scan_timer = QTimer()
self.port_scan_timer.timeout.connect(self.scan_serial_ports)
self.port_scan_timer.start(2000)
self.is_serial_open: bool = False
self.is_loop_sending: bool = False
self.display_mode: str = "dec"
self.total_receive_bytes: int = 0
self.session_receive_bytes: int = 0
self._init_ui()
self.show()
def _setup_global_style(self):
palette = QPalette()
palette.setColor(QPalette.Window, QColor(245, 245, 245))
palette.setColor(QPalette.WindowText, QColor(50, 50, 50))
palette.setColor(QPalette.Base, QColor(255, 255, 255))
palette.setColor(QPalette.AlternateBase, QColor(240, 240, 240))
palette.setColor(QPalette.Button, QColor(230, 230, 230))
palette.setColor(QPalette.ButtonText, QColor(50, 50, 50))
self.setPalette(palette)
# 样式表适配:降低控件尺寸,优化最大化布局
self.setStyleSheet("""
QMainWindow {background: #F5F5F5;}
QGroupBox {
font-weight: bold;
font-size: 10px; /* 原16 → 14 */
border: 1px solid #DDD;
border-radius: 6px;
margin-top: 6px; /* 原12 → 8 */
padding-top: 6px; /* 原10 → 8 */
}
QPushButton {
background: #E8E8E8;
border: 1px solid #CCC;
border-radius: 6px;
padding: 6px 14px; /* 原10px20px → 8px16px */
font-size: 6px; /* 原14 → 12 */
min-height: 20px; /* 原40 → 35 */
sizePolicy: Expanding;
}
QPushButton:hover {background: #D8D8D8;}
QPushButton:disabled {background: #F0F0F0; color: #999;}
QPushButton#primaryBtn {
background: #4A90E2;
color: white;
border: none;
min-height: 25px; /* 原45 → 40 */
}
QPushButton#primaryBtn:hover {background: #357ABD;}
QLabel {font-size: 10px; color: #333;} /* 原14 → 12 */
QTableWidget {
gridline-color: #DDD;
font-family: Consolas;
font-size: 10px; /* 原14 → 12 */
border: 1px solid #EEE;
border-radius: 4px;
min-height: 200px; /* 原500 → 300 */
sizePolicy: Expanding;
}
QTableWidget::item:selected {
background: #E1F0FF;
color: #333;
}
QStatusBar {font-size: 10px; color: #333; background: #F0F0F0;} /* 原14 → 12 */
QLineEdit, QComboBox, QTextEdit {
border: 1px solid #CCC;
border-radius: 4px;
padding: 4px 8px; /* 原6px10px → 4px8px */
font-size: 10px; /* 原14 → 12 */
min-height: 20px; /* 原35 → 30 */
sizePolicy: Expanding;
}
QLineEdit:focus, QComboBox:focus, QTextEdit:focus {
border: 1px solid #4A90E2;
outline: none;
}
""")
def _init_ui(self) -> None:
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QVBoxLayout(central_widget)
# 布局间距适配(原20 → 12,原25 → 15)
main_layout.setSpacing(10)
main_layout.setContentsMargins(10, 10, 10, 10)
main_layout.addWidget(self._build_serial_config_widget())
main_layout.addWidget(self._build_modbus_config_widget())
# 优化拉伸权重,最大化时合理分配空间
main_layout.addWidget(self._build_io_widget(), stretch=1)
self.status_bar = QStatusBar()
self.setStatusBar(self.status_bar)
self.status_bar.setFont(FONT_CONFIG["status"])
self.status_bar.showMessage("就绪 | 未连接串口", 0)
def _build_serial_config_widget(self) -> QGroupBox:
group = QGroupBox("串口配置 | Serial Configuration")
group.setFont(FONT_CONFIG["title"])
layout = QGridLayout(group)
# 间距适配(原25 → 15,原15 → 10)
layout.setHorizontalSpacing(10)
layout.setVerticalSpacing(5)
layout.addWidget(QLabel("串口 Port:"), 0, 0, Qt.AlignRight | Qt.AlignVCenter)
self.serial_combo = QComboBox()
# 串口选择框宽度适配(原300 → 200)
self.serial_combo.setMinimumWidth(150)
self.serial_combo.setFont(FONT_CONFIG["mono"])
# 设置拉伸策略,最大化时自动加宽
self.serial_combo.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
layout.addWidget(self.serial_combo, 0, 1)
layout.addWidget(QLabel("波特率 Baud:"), 0, 2, Qt.AlignRight | Qt.AlignVCenter)
self.baudrate_combo = QComboBox()
self.baudrate_combo.addItems(["9600", "19200", "38400", "57600", "115200", "230400", "460800"])
self.baudrate_combo.setCurrentText("9600")
self.baudrate_combo.setMinimumWidth(50) # 原150 → 100
layout.addWidget(self.baudrate_combo, 0, 3)
layout.addWidget(QLabel("数据位 Data:"), 1, 0, Qt.AlignRight | Qt.AlignVCenter)
self.databits_combo = QComboBox()
self.databits_combo.addItems(["5", "6", "7", "8"])
self.databits_combo.setCurrentText("8")
self.databits_combo.setMinimumWidth(50) # 原150 → 100
layout.addWidget(self.databits_combo, 1, 1)
layout.addWidget(QLabel("停止位 Stop:"), 1, 2, Qt.AlignRight | Qt.AlignVCenter)
self.stopbits_combo = QComboBox()
self.stopbits_combo.addItems(["1", "1.5", "2"])
self.stopbits_combo.setCurrentText("1")
self.stopbits_combo.setMinimumWidth(50) # 原150 → 100
layout.addWidget(self.stopbits_combo, 1, 3)
layout.addWidget(QLabel("校验位 Parity:"), 2, 0, Qt.AlignRight | Qt.AlignVCenter)
self.parity_combo = QComboBox()
self.parity_combo.addItems(["无 None", "奇校验 Odd", "偶校验 Even"])
self.parity_combo.setCurrentText("无 None")
self.parity_combo.setMinimumWidth(50) # 原150 → 100
layout.addWidget(self.parity_combo, 2, 1)
layout.addWidget(QLabel("流控 Flow:"), 2, 2, Qt.AlignRight | Qt.AlignVCenter)
self.rts_cts_check = QCheckBox("RTS/CTS")
self.rts_cts_check.setMinimumHeight(20) # 原30 → 20
layout.addWidget(self.rts_cts_check, 2, 3)
self.open_close_btn = QPushButton("打开串口 Open")
self.open_close_btn.setObjectName("primaryBtn")
self.open_close_btn.setMinimumWidth(50) # 原150 → 100
self.open_close_btn.setMinimumHeight(25) # 原45 → 35
self.open_close_btn.clicked.connect(self.toggle_serial)
layout.addWidget(self.open_close_btn, 0, 4, 3, 1)
return group
def _build_modbus_config_widget(self) -> QGroupBox:
group = QGroupBox("Modbus 配置 | RTU Master(01/02/03/04/05/06/15/16)")
group.setFont(FONT_CONFIG["title"])
layout = QGridLayout(group)
# 间距适配(原25 → 15,原18 → 10)
layout.setHorizontalSpacing(15)
layout.setVerticalSpacing(10)
self.modbus_status_label = QLabel("Disconnected")
self.modbus_status_label.setStyleSheet("color: #E74C3C; font-weight: bold; font-size: 6px;") # 原14 → 12
layout.addWidget(self.modbus_status_label, 0, 0, 1, 2)
status_widget = QWidget()
status_layout = QHBoxLayout(status_widget)
status_layout.setSpacing(15) # 原30 → 15
self.tx_label = QLabel("Tx = 0")
self.tx_label.setStyleSheet("color: #2980B9; font-weight: bold;")
self.err_label = QLabel("Err = 0")
self.err_label.setStyleSheet("color: #E74C3C; font-weight: bold;")
self.id_label = QLabel("ID = 1")
self.f_label = QLabel("F = 03")
self.sr_label = QLabel("SR = 1000ms")
for lbl in [self.tx_label, self.err_label, self.id_label, self.f_label, self.sr_label]:
lbl.setFont(FONT_CONFIG["mono"])
lbl.setMinimumHeight(10) # 原30 → 15
status_layout.addWidget(lbl)
status_layout.addStretch()
layout.addWidget(status_widget, 0, 2, 1, 8)
layout.addWidget(QLabel("从站地址 ID:"), 1, 0, Qt.AlignRight)
self.slave_id_edit = QLineEdit("1")
self.slave_id_edit.setMinimumWidth(50) # 原120 → 100
self.slave_id_edit.setMinimumHeight(10) # 原35 → 20
self.slave_id_edit.setFont(FONT_CONFIG["mono"])
self.slave_id_edit.editingFinished.connect(lambda: self._update_modbus_status("id"))
layout.addWidget(self.slave_id_edit, 1, 1)
layout.addWidget(QLabel("功能码 Func:"), 1, 2, Qt.AlignRight)
self.func_code_combo = QComboBox()
self.func_code_combo.setMinimumWidth(100) # 原280 → 200
# 设置拉伸策略
self.func_code_combo.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
for code, (name, _, desc, _) in FUNC_CODE_MAP.items():
self.func_code_combo.addItem(f"{code:02d} {name} ({desc})", code)
self.func_code_combo.currentIndexChanged.connect(self._on_func_code_changed)
layout.addWidget(self.func_code_combo, 1, 3)
layout.addWidget(QLabel("起始地址 Addr:"), 1, 4, Qt.AlignRight)
self.start_addr_edit = QLineEdit("0")
self.start_addr_edit.setMinimumWidth(50) # 原120 → 100
self.start_addr_edit.setMinimumHeight(10) # 原35 → 20
self.start_addr_edit.setFont(FONT_CONFIG["mono"])
self.start_addr_edit.editingFinished.connect(lambda: self._validate_addr_input())
layout.addWidget(self.start_addr_edit, 1, 5)
layout.addWidget(QLabel("数量 Qty:"), 1, 6, Qt.AlignRight)
self.count_edit = QLineEdit("10")
self.count_edit.setMinimumWidth(50) # 原120 → 100
self.count_edit.setMinimumHeight(10) # 原35 → 20
self.count_edit.setFont(FONT_CONFIG["mono"])
self.count_edit.editingFinished.connect(self._adjust_table_rows)
layout.addWidget(self.count_edit, 1, 7)
layout.addWidget(QLabel("扫描周期 MS:"), 2, 0, Qt.AlignRight)
self.interval_edit = QLineEdit("1000")
self.interval_edit.setMinimumWidth(100) # 原120 → 100
self.interval_edit.setMinimumHeight(20) # 原35 → 20
self.interval_edit.setFont(FONT_CONFIG["mono"])
self.interval_edit.editingFinished.connect(lambda: self._update_modbus_status("interval"))
layout.addWidget(self.interval_edit, 2, 1)
layout.addWidget(QLabel("写操作值 Value:"), 2, 2, Qt.AlignRight)
self.write_value_edit = QLineEdit()
self.write_value_edit.setMinimumWidth(150) # 原280 → 220
self.write_value_edit.setMinimumHeight(10) # 原35 → 20
self.write_value_edit.setFont(FONT_CONFIG["mono"])
self.write_value_edit.setPlaceholderText("线圈:0/1/True/False | 寄存器:0-65535 | 多值用逗号分隔")
self.write_value_edit.setEnabled(False)
# 设置拉伸策略
self.write_value_edit.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
layout.addWidget(self.write_value_edit, 2, 3, 1, 3)
layout.addWidget(QLabel("显示模式:"), 2, 6, Qt.AlignRight)
self.display_mode_combo = QComboBox()
self.display_mode_combo.setMinimumWidth(50) # 原120 → 80
self.display_mode_combo.addItems(DISPLAY_MODE_MAP.keys())
self.display_mode_combo.currentTextChanged.connect(self._on_display_mode_changed)
layout.addWidget(self.display_mode_combo, 2, 7)
self.modbus_btn = QPushButton("开始轮询 Start Poll")
self.modbus_btn.setObjectName("primaryBtn")
self.modbus_btn.setMinimumWidth(50) # 原160 → 120
self.modbus_btn.setMinimumHeight(10) # 原45 → 40
self.modbus_btn.clicked.connect(self.toggle_modbus_operation)
self.modbus_btn.setEnabled(False)
layout.addWidget(self.modbus_btn, 3, 0, 1, 2)
self.clear_modbus_btn = QPushButton("清空数据 Clear")
self.clear_modbus_btn.setMinimumWidth(120) # 原160 → 120
self.clear_modbus_btn.setMinimumHeight(20) # 原40 → 35
self.clear_modbus_btn.clicked.connect(self.clear_modbus_table)
layout.addWidget(self.clear_modbus_btn, 3, 2, 1, 2)
self.clear_modbus_log_btn = QPushButton("清空日志 Clear Log")
self.clear_modbus_log_btn.setMinimumWidth(50) # 原160 → 120
self.clear_modbus_log_btn.setMinimumHeight(10) # 原40 → 35
self.clear_modbus_log_btn.clicked.connect(self.clear_receive)
layout.addWidget(self.clear_modbus_log_btn, 3, 4, 1, 2)
self.manual_write_btn = QPushButton("执行写操作 Write")
self.manual_write_btn.setObjectName("primaryBtn")
self.manual_write_btn.setMinimumWidth(50) # 原160 → 120
self.manual_write_btn.setMinimumHeight(10) # 原45 → 40
self.manual_write_btn.clicked.connect(self.execute_manual_write)
self.manual_write_btn.setEnabled(False)
layout.addWidget(self.manual_write_btn, 3, 6, 1, 2)
# 表格适配:移除固定宽度,改为自适应
self.modbus_table = QTableWidget()
self.modbus_table.setColumnCount(2)
self.modbus_table.setHorizontalHeaderLabels(["Address (Dec)", "Value"])
# 列宽自适应:Address列根据内容调整,Value列拉伸
self.modbus_table.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeToContents)
self.modbus_table.horizontalHeader().setSectionResizeMode(1, QHeaderView.Stretch)
self.modbus_table.setRowCount(10)
self.modbus_table.setAlternatingRowColors(True)
self.modbus_table.setMinimumHeight(100) # 原500 → 300
# 设置表格拉伸策略,最大化时自动占满空间
self.modbus_table.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
self._init_modbus_table()
layout.addWidget(self.modbus_table, 4, 0, 1, 8)
return group
def _build_io_widget(self) -> QWidget:
widget = QWidget()
layout = QHBoxLayout(widget)
layout.setSpacing(10) # 原25 → 20
receive_group = QGroupBox("接收日志 | Receive Log")
receive_group.setFont(FONT_CONFIG["title"])
receive_layout = QVBoxLayout(receive_group)
# 设置接收区拉伸策略
receive_group.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
receive_ctrl = QHBoxLayout()
self.hex_receive_check = QCheckBox("十六进制显示 Hex")
self.timestamp_check = QCheckBox("显示时间戳 Time")
self.timestamp_check.setChecked(True)
self.auto_wrap_check = QCheckBox("自动换行 Wrap")
self.auto_wrap_check.setChecked(True)
self.auto_scroll_check = QCheckBox("自动滚屏 Scroll")
self.auto_scroll_check.setChecked(True)
self.log_filter_combo = QComboBox()
self.log_filter_combo.addItems(["全部日志", "仅Modbus", "仅串口"])
self.log_filter_combo.setMinimumWidth(80) # 原150 → 120
self.log_filter_combo.currentTextChanged.connect(self._filter_receive_log)
for cb in [self.hex_receive_check, self.timestamp_check, self.auto_wrap_check, self.auto_scroll_check]:
cb.setMinimumHeight(20) # 原30 → 25
receive_ctrl.addWidget(self.hex_receive_check)
receive_ctrl.addWidget(self.timestamp_check)
receive_ctrl.addWidget(self.auto_wrap_check)
receive_ctrl.addWidget(self.auto_scroll_check)
receive_ctrl.addWidget(QLabel("日志过滤:"))
receive_ctrl.addWidget(self.log_filter_combo)
receive_ctrl.addStretch()
self.copy_receive_btn = QPushButton("复制 Copy")
self.copy_receive_btn.setMinimumWidth(80) # 原100 → 80
self.copy_receive_btn.setMinimumHeight(20) # 原40 → 35
self.copy_receive_btn.clicked.connect(self.copy_receive_content)
self.save_log_btn = QPushButton("保存 Save")
self.save_log_btn.setMinimumWidth(80) # 原100 → 80
self.save_log_btn.setMinimumHeight(20) # 原40 → 35
self.save_log_btn.clicked.connect(self.save_receive_log)
self.clear_receive_btn = QPushButton("清空 Clear")
self.clear_receive_btn.setMinimumWidth(80) # 原100 → 80
self.clear_receive_btn.setMinimumHeight(20) # 原40 → 35
self.clear_receive_btn.clicked.connect(self.clear_receive)
receive_ctrl.addWidget(self.copy_receive_btn)
receive_ctrl.addWidget(self.save_log_btn)
receive_ctrl.addWidget(self.clear_receive_btn)
receive_layout.addLayout(receive_ctrl)
self.receive_text = QTextEdit()
self.receive_text.setReadOnly(True)
self.receive_text.setFont(FONT_CONFIG["mono"])
self.receive_text.setLineWrapMode(QTextEdit.WidgetWidth if self.auto_wrap_check.isChecked() else QTextEdit.NoWrap)
self.receive_text.setMinimumHeight(100) # 原400 → 250
# 设置接收框拉伸策略
self.receive_text.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
self.auto_wrap_check.clicked.connect(self._toggle_auto_wrap)
receive_layout.addWidget(self.receive_text)
self.byte_count_label = QLabel("总接收字节:0 | 本次会话:0")
self.byte_count_label.setAlignment(Qt.AlignRight)
self.byte_count_label.setFont(FONT_CONFIG["mono"])
self.byte_count_label.setMinimumHeight(10) # 原30 → 25
receive_layout.addWidget(self.byte_count_label)
send_group = QGroupBox("发送数据 | Transmit Data")
send_group.setFont(FONT_CONFIG["title"])
send_layout = QVBoxLayout(send_group)
# 设置发送区拉伸策略(宽度占比降低)
send_group.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Expanding)
send_ctrl = QHBoxLayout()
self.hex_send_check = QCheckBox("十六进制发送 Hex")
self.auto_clear_send_check = QCheckBox("发送后清空 Clear After Send")
self.add_newline_check = QCheckBox("自动加换行 Newline")
self.auto_crc_check = QCheckBox("自动计算Modbus CRC")
self.auto_crc_check.setChecked(True)
for cb in [self.hex_send_check, self.auto_clear_send_check, self.add_newline_check, self.auto_crc_check]:
cb.setMinimumHeight(10) # 原30 → 25
send_ctrl.addWidget(self.hex_send_check)
send_ctrl.addWidget(self.auto_clear_send_check)
send_ctrl.addWidget(self.add_newline_check)
send_ctrl.addWidget(self.auto_crc_check)
send_ctrl.addStretch()
self.loop_send_check = QCheckBox("循环发送 Loop")
self.loop_send_check.setMinimumHeight(15) # 原30 → 25
self.loop_interval_edit = QLineEdit("1000")
self.loop_interval_edit.setMinimumWidth(50) # 原100 → 80
self.loop_interval_edit.setMinimumHeight(10) # 原35 → 30
self.loop_interval_edit.setPlaceholderText("间隔(ms)")
self.loop_interval_edit.setFont(FONT_CONFIG["mono"])
self.loop_interval_edit.setEnabled(False)
self.loop_send_check.clicked.connect(lambda: self.loop_interval_edit.setEnabled(self.loop_send_check.isChecked()))
send_ctrl.addWidget(self.loop_send_check)
send_ctrl.addWidget(QLabel("间隔:"))
send_ctrl.addWidget(self.loop_interval_edit)
send_layout.addLayout(send_ctrl)
self.send_text = QTextEdit()
self.send_text.setFont(FONT_CONFIG["mono"])
self.send_text.setMinimumHeight(50) # 原150 → 100
# 设置发送框拉伸策略
self.send_text.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
send_layout.addWidget(self.send_text)
send_btn_layout = QHBoxLayout()
self.send_btn = QPushButton("发送 Send")
self.send_btn.setObjectName("primaryBtn")
self.send_btn.setMinimumWidth(50) # 原120 → 100
self.send_btn.setMinimumHeight(10) # 原45 → 40
self.send_btn.clicked.connect(self.send_data)
self.send_btn.setEnabled(False)
self.stop_loop_btn = QPushButton("停止循环 Stop")
self.stop_loop_btn.setMinimumWidth(50) # 原120 → 100
self.stop_loop_btn.setMinimumHeight(10) # 原40 → 35
self.stop_loop_btn.clicked.connect(self.stop_loop_send)
self.stop_loop_btn.setEnabled(False)
send_btn_layout.addWidget(self.send_btn)
send_btn_layout.addWidget(self.stop_loop_btn)
send_btn_layout.addStretch()
send_layout.addLayout(send_btn_layout)
# 调整收发区拉伸权重(原3:1 → 7:1,更合理)
layout.addWidget(receive_group, stretch=7)
layout.addWidget(send_group, stretch=1)
return widget
# ===================== UI 辅助方法 =====================
def _init_modbus_table(self) -> None:
for i in range(self.modbus_table.rowCount()):
addr_item = QTableWidgetItem(f"{i:05d}")
addr_item.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)
addr_item.setBackground(QColor(248, 248, 248))
addr_item.setFont(FONT_CONFIG["mono"])
self.modbus_table.setItem(i, 0, addr_item)
val_item = QTableWidgetItem("0")
val_item.setFont(FONT_CONFIG["mono"])
self.modbus_table.setItem(i, 1, val_item)
# 行高适配(原35 → 10)
for i in range(self.modbus_table.rowCount()):
self.modbus_table.setRowHeight(i, 10)
def _adjust_table_rows(self) -> None:
count = validate_numeric_input(self.count_edit.text(), 1, 1000, "数量")
if count:
self.modbus_table.setRowCount(count)
for i in range(self.modbus_table.rowCount()):
self.modbus_table.setRowHeight(i, 10)
self._init_modbus_table()
def _on_func_code_changed(self) -> None:
func_code = self.func_code_combo.currentData()
is_write = FUNC_CODE_MAP[func_code][1] == "write"
self.write_value_edit.setEnabled(is_write)
self.manual_write_btn.setEnabled(is_write and self.is_serial_open)
self.interval_edit.setEnabled(not is_write)
self.f_label.setText(f"F = {func_code:02d}")
if is_write:
self.modbus_btn.setText("写操作需点击「执行写操作」")
self.modbus_btn.setEnabled(False)
if func_code in [5, 15]:
self.write_value_edit.setPlaceholderText("线圈值:0/1/True/False | 多值用逗号分隔")
else:
self.write_value_edit.setPlaceholderText("寄存器值:0-65535 | 多值用逗号分隔")
else:
self.modbus_btn.setText("开始轮询 Start Poll")
self.modbus_btn.setEnabled(self.is_serial_open)
self.write_value_edit.setPlaceholderText("写操作值 Value:线圈:0/1,寄存器:十进制,多值用逗号分隔")
self._validate_addr_input()
def _validate_addr_input(self) -> None:
func_code = self.func_code_combo.currentData()
addr = validate_numeric_input(self.start_addr_edit.text(), 0, 65535, "起始地址")
if addr and not validate_modbus_addr(func_code, addr):
self.start_addr_edit.setText(str(FUNC_CODE_MAP[func_code][3][0]))
def _on_display_mode_changed(self, text: str) -> None:
self.display_mode = DISPLAY_MODE_MAP[text]
for row in range(self.modbus_table.rowCount()):
val_item = self.modbus_table.item(row, 1)
if val_item:
try:
raw_text = val_item.text().replace("0x", "").replace("0b", "").strip()
if raw_text in ["0", "1"]:
raw_val = int(raw_text)
else:
raw_val = int(raw_text, 0)
val_item.setText(format_value(raw_val, self.display_mode))
except:
pass
def _toggle_auto_wrap(self) -> None:
mode = QTextEdit.WidgetWidth if self.auto_wrap_check.isChecked() else QTextEdit.NoWrap
self.receive_text.setLineWrapMode(mode)
def _update_modbus_status(self, type_: str) -> None:
if type_ == "id":
slave_id = validate_numeric_input(self.slave_id_edit.text(), 1, 247, "从站地址")
if slave_id:
self.id_label.setText(f"ID = {slave_id}")
elif type_ == "interval":
interval = validate_numeric_input(self.interval_edit.text(), 10, 30000, "扫描周期")
if interval:
self.sr_label.setText(f"SR = {interval}ms")
def _filter_receive_log(self, filter_type: str) -> None:
original_text = self.receive_text.toPlainText()
lines = original_text.split("\n")
filtered_lines = []
for line in lines:
if not line.strip():
continue
if filter_type == "全部日志":
filtered_lines.append(line)
elif filter_type == "仅Modbus" and "[Modbus-" in line:
filtered_lines.append(line)
elif filter_type == "仅串口" and "[串口-" in line:
filtered_lines.append(line)
self.receive_text.clear()
self.receive_text.setText("\n".join(filtered_lines))
# ===================== 串口操作 =====================
def scan_serial_ports(self) -> None:
current_port = self.serial_combo.currentText()
valid_ports = scan_valid_ports()
current_ports = [self.serial_combo.itemText(i) for i in range(self.serial_combo.count())]
if valid_ports != current_ports:
self.serial_combo.clear()
self.serial_combo.addItems(valid_ports)
if current_port and current_port in valid_ports:
self.serial_combo.setCurrentText(current_port)
def toggle_serial(self) -> None:
if not self.is_serial_open:
try:
if not self.serial_combo.currentText():
QMessageBox.warning(self, "串口错误", "请选择有效的串口")
return
port = self.serial_combo.currentText().split(" - ")[0]
self.serial.port = port
self.serial.baudrate = int(self.baudrate_combo.currentText())
self.serial.bytesize = int(self.databits_combo.currentText())
self.serial.stopbits = float(self.stopbits_combo.currentText())
self.serial.parity = {
"无 None": serial.PARITY_NONE,
"奇校验 Odd": serial.PARITY_ODD,
"偶校验 Even": serial.PARITY_EVEN
}[self.parity_combo.currentText()]
self.serial.timeout = 0.1
self.serial.rtscts = self.rts_cts_check.isChecked()
self.serial.xonxoff = False
self.serial.dsrdtr = False
self.serial.open()
self.is_serial_open = True
self.open_close_btn.setText("关闭串口 Close")
self.send_btn.setEnabled(True)
func_code = self.func_code_combo.currentData()
self.modbus_btn.setEnabled(FUNC_CODE_MAP[func_code][1] == "read")
self.manual_write_btn.setEnabled(FUNC_CODE_MAP[func_code][1] == "write")
self.modbus_status_label.setText("Connected")
self.modbus_status_label.setStyleSheet("color: #27AE60; font-weight: bold; font-size: 12px;")
self.status_bar.showMessage(f"串口已打开:{port} | 波特率:{self.serial.baudrate}", 3000)
self.receive_thread = SerialReceiveThread(self.serial)
self.receive_thread.receive_signal.connect(self.handle_receive_data)
self.receive_thread.error_signal.connect(lambda msg: self.status_bar.showMessage(msg, 3000))
self.receive_thread.start()
self.session_receive_bytes = 0
except Exception as e:
QMessageBox.critical(self, "串口错误", f"打开串口失败:{str(e)}\n请检查串口是否被占用")
else:
self.stop_all_operations()
self.serial.close()
self.is_serial_open = False
self.open_close_btn.setText("打开串口 Open")
self.send_btn.setEnabled(False)
self.modbus_btn.setEnabled(False)
self.manual_write_btn.setEnabled(False)
self.modbus_status_label.setText("Disconnected")
self.modbus_status_label.setStyleSheet("color: #E74C3C; font-weight: bold; font-size: 12px;")
self.status_bar.showMessage("串口已关闭", 3000)
# ===================== 数据收发 =====================
def handle_receive_data(self, data: bytes) -> None:
self.total_receive_bytes += len(data)
self.session_receive_bytes += len(data)
self.byte_count_label.setText(f"总接收字节:{self.total_receive_bytes} | 本次会话:{self.session_receive_bytes}")
timestamp = QDateTime.currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz") if self.timestamp_check.isChecked() else ""
if self.hex_receive_check.isChecked():
display_data = data.hex(' ')
else:
try:
display_data = data.decode('utf-8', errors='replace')
except:
display_data = str(data)
log_prefix = f"[串口-接收][{timestamp}] " if timestamp else "[串口-接收] "
log_line = f"{log_prefix}{display_data}\n"
self.receive_text.append(log_line)
if self.auto_scroll_check.isChecked():
self.receive_text.moveCursor(self.receive_text.textCursor().End)
def send_data(self) -> None:
if not self.is_serial_open:
QMessageBox.warning(self, "错误", "串口未打开")
return
try:
send_text = self.send_text.toPlainText().strip()
if not send_text:
return
if self.hex_send_check.isChecked():
send_hex = send_text.replace(" ", "").replace("\n", "").replace("\r", "")
if len(send_hex) % 2 != 0:
raise Exception("十六进制数据长度必须为偶数")
send_data = bytes.fromhex(send_hex)
if self.auto_crc_check.isChecked() and len(send_data) >= 2:
if len(send_data) >= 4 and send_data[-2:] == modbus_crc16(send_data[:-2]):
send_data = send_data[:-2]
send_data += modbus_crc16(send_data)
self.status_bar.showMessage(f"自动计算CRC:{send_data[-2:].hex()}", 2000)
else:
send_data = send_text.encode('utf-8')
if self.add_newline_check.isChecked():
send_data += b'\r\n'
self.serial.write(send_data)
self.status_bar.showMessage(f"发送成功:{len(send_data)} 字节", 2000)
timestamp = QDateTime.currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz") if self.timestamp_check.isChecked() else ""
log_prefix = f"[串口-发送][{timestamp}] " if timestamp else "[串口-发送] "
if self.hex_receive_check.isChecked():
log_data = send_data.hex(' ')
else:
log_data = send_data.decode('utf-8', errors='replace')
self.receive_text.append(f"{log_prefix}{log_data}\n")
if self.auto_clear_send_check.isChecked():
self.send_text.clear()
if self.loop_send_check.isChecked() and not self.is_loop_sending:
interval = validate_numeric_input(self.loop_interval_edit.text(), 10, 30000, "循环间隔") or 1000
self.loop_send_timer.start(interval)
self.is_loop_sending = True
self.send_btn.setEnabled(False)
self.stop_loop_btn.setEnabled(True)
self.status_bar.showMessage(f"开始循环发送(间隔{interval}ms)", 2000)
except Exception as e:
err_msg = f"发送失败:{str(e)}"
self.status_bar.showMessage(err_msg, 2000)
QMessageBox.critical(self, "错误", err_msg)
def stop_loop_send(self) -> None:
self.loop_send_timer.stop()
self.is_loop_sending = False
self.send_btn.setEnabled(True)
self.stop_loop_btn.setEnabled(False)
self.status_bar.showMessage("循环发送已停止", 2000)
# ===================== Modbus 操作 =====================
def toggle_modbus_operation(self) -> None:
if not self.modbus_thread or not self.modbus_thread.isRunning():
slave_id = validate_numeric_input(self.slave_id_edit.text(), 1, 247, "从站地址")
func_code = self.func_code_combo.currentData()
start_addr = validate_numeric_input(self.start_addr_edit.text(), 0, 65535, "起始地址")
count = validate_numeric_input(self.count_edit.text(), 1, 1000, "数量")
interval = validate_numeric_input(self.interval_edit.text(), 10, 30000, "扫描周期")
if not all([slave_id, func_code, start_addr, count, interval]):
return
if not validate_modbus_addr(func_code, start_addr):
return
self.modbus_thread = ModbusThread(self.serial)
self.modbus_thread.set_read_params(slave_id, func_code, start_addr, count, interval)
self.modbus_thread.poll_result.connect(self.update_modbus_table)
self.modbus_thread.status_update.connect(lambda msg: self.status_bar.showMessage(msg, 2000))
self.modbus_thread.log_update.connect(self._append_modbus_log)
self.modbus_thread.start()
self.modbus_btn.setText("停止轮询 Stop Poll")
self.status_bar.showMessage(f"开始Modbus轮询 | 从站{slave_id} 功能码{func_code:02d}", 2000)
else:
self.stop_modbus_operation()
self.modbus_btn.setText("开始轮询 Start Poll")
self.status_bar.showMessage("Modbus轮询已停止", 2000)
def stop_modbus_operation(self) -> None:
if self.modbus_thread and self.modbus_thread.isRunning():
self.modbus_thread.stop()
self.modbus_thread.wait()
def update_modbus_table(self, data_list: List[int | bool], tx_count: int, err_count: int) -> None:
self.tx_label.setText(f"Tx = {tx_count}")
self.err_label.setText(f"Err = {err_count}")
for row in range(self.modbus_table.rowCount()):
if row < len(data_list):
val = data_list[row]
formatted_val = format_value(val, self.display_mode)
val_item = QTableWidgetItem(formatted_val)
val_item.setFont(FONT_CONFIG["mono"])
self.modbus_table.setItem(row, 1, val_item)
else:
val_item = QTableWidgetItem("0")
val_item.setFont(FONT_CONFIG["mono"])
self.modbus_table.setItem(row, 1, val_item)
def _append_modbus_log(self, log_type: str, content: str) -> None:
timestamp = QDateTime.currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz") if self.timestamp_check.isChecked() else ""
log_prefix = f"[{log_type}][{timestamp}] " if timestamp else f"[{log_type}] "
self.receive_text.append(f"{log_prefix}{content}\n")
if self.auto_scroll_check.isChecked():
self.receive_text.moveCursor(self.receive_text.textCursor().End)
def execute_manual_write(self) -> None:
if not self.is_serial_open:
QMessageBox.warning(self, "错误", "串口未打开")
return
slave_id = validate_numeric_input(self.slave_id_edit.text(), 1, 247, "从站地址")
func_code = self.func_code_combo.currentData()
start_addr = validate_numeric_input(self.start_addr_edit.text(), 0, 65535, "起始地址")
count = validate_numeric_input(self.count_edit.text(), 1, 1000, "数量")
if not all([slave_id, func_code, start_addr, count]):
return
if not validate_modbus_addr(func_code, start_addr):
return
write_values = parse_write_values(self.write_value_edit.text(), func_code, count)
if not write_values:
return
self.modbus_thread = ModbusThread(self.serial)
self.modbus_thread.set_write_params(slave_id, func_code, start_addr, count, write_values)
self.modbus_thread.write_result.connect(self.handle_write_result)
self.modbus_thread.status_update.connect(lambda msg: self.status_bar.showMessage(msg, 2000))
self.modbus_thread.log_update.connect(self._append_modbus_log)
self.modbus_thread.start()
def handle_write_result(self, success: bool, msg: str) -> None:
if success:
QMessageBox.information(self, "操作成功", msg)
else:
QMessageBox.critical(self, "操作失败", msg)
def clear_modbus_table(self) -> None:
for row in range(self.modbus_table.rowCount()):
val_item = QTableWidgetItem("0")
val_item.setFont(FONT_CONFIG["mono"])
self.modbus_table.setItem(row, 1, val_item)
def clear_receive(self) -> None:
self.receive_text.clear()
self.session_receive_bytes = 0
self.byte_count_label.setText(f"总接收字节:{self.total_receive_bytes} | 本次会话:0")
def copy_receive_content(self) -> None:
clipboard = QApplication.clipboard()
clipboard.setText(self.receive_text.toPlainText())
self.status_bar.showMessage("接收日志已复制到剪贴板", 2000)
def save_receive_log(self) -> None:
file_path, _ = QFileDialog.getSaveFileName(self, "保存日志", f"modbus_log_{QDateTime.currentDateTime().toString('yyyyMMddhhmmss')}.txt", "Text Files (*.txt);;All Files (*)")
if file_path:
try:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(self.receive_text.toPlainText())
self.status_bar.showMessage(f"日志已保存到:{file_path}", 3000)
except Exception as e:
QMessageBox.critical(self, "保存失败", f"日志保存失败:{str(e)}")
def stop_all_operations(self) -> None:
self.stop_loop_send()
self.stop_modbus_operation()
if self.receive_thread and self.receive_thread.isRunning():
self.receive_thread.stop()
self.receive_thread.wait()
# ===================== 程序入口 =====================
if __name__ == "__main__":
# 解决高DPI显示问题
QApplication.setAttribute(Qt.AA_EnableHighDpiScaling)
QApplication.setAttribute(Qt.AA_UseHighDpiPixmaps)
app = QApplication(sys.argv)
app.setFont(FONT_CONFIG["default"])
window = ModbusRTUMainWindow()
sys.exit(app.exec_())
更新版
[Python] 纯文本查看 复制代码 import sys
import serial
import json
import csv
import socket
import asyncio
from datetime import datetime
from typing import List, Optional, Any, Dict, Tuple
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout,
QGroupBox, QLabel, QComboBox, QLineEdit, QPushButton, QCheckBox,
QTextEdit, QTableWidget, QTableWidgetItem, QStatusBar, QMessageBox,
QFileDialog, QHeaderView, QSizePolicy, QTabWidget, QSpinBox)
from PyQt5.QtCore import Qt, QThread, pyqtSignal, QTimer, QDateTime
from PyQt5.QtGui import QPalette, QColor, QFont, QBrush
import bleak
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
# ===================== 常量定义(字体适配+控件尺寸兜底) =====================
FONT_CONFIG = {
"title": QFont("SimHei", 14, QFont.Bold), # 标题字体(适度放大,避免过度挤压)
"subtitle": QFont("SimHei", 12, QFont.Bold), # 子控件字体(微调字号,优先保证显示)
"mono": QFont("Consolas", 11, QFont.Bold),
"status": QFont("SimHei", 11, QFont.Bold)
}
DISPLAY_MODE_MAP = {
"十进制": "dec",
"十六进制": "hex",
"二进制": "bin"
}
FUNC_CODE_MAP = {
1: ("读线圈状态", "read", "0x01", (0, 65535)),
2: ("读离散输入", "read", "0x02", (0, 65535)),
3: ("读保持寄存器", "read", "0x03", (0, 65535)),
4: ("读输入寄存器", "read", "0x04", (0, 65535)),
5: ("写单个线圈", "write", "0x05", (0, 65535)),
6: ("写单个寄存器", "write", "0x06", (0, 65535)),
15: ("写多个线圈", "write", "0x0F", (0, 65535)),
16: ("写多个寄存器", "write", "0x10", (0, 65535))
}
# 新增:数据记录相关常量
RECORD_FORMAT_OPTIONS = ["CSV", "JSON"]
RECORD_STATUS = {
"STOPPED": "未记录",
"RECORDING": "记录中"
}
# 新增:蓝牙相关常量
BLUETOOTH_CHARACTERISTIC_UUID = "0000ffe1-0000-1000-8000-00805f9b34fb" # 常用串口服务UUID
# 新增:网络相关常量
NETWORK_PROTOCOLS = ["TCP客户端", "TCP服务器", "UDP"]
# ===================== 工具函数 =====================
def modbus_crc16(data: bytes) -> bytes:
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
crc = (crc >> 1) ^ 0xA001 if crc & 0x0001 else crc >> 1
return bytes([crc & 0xFF, (crc >> 8) & 0xFF])
def xor_checksum(data: bytes) -> bytes:
xor_result = 0x00
for byte in data:
xor_result ^= byte
return bytes([xor_result])
def _add_checksum(data, checksum_type, debugger=None):
if checksum_type == 'CRC-16':
if debugger:
return data + debugger.calculate_crc16(data)
else:
return data + modbus_crc16(data)
elif checksum_type == 'XOR':
if debugger:
xor_value = debugger.calculate_xor(data)
else:
xor_value = 0
for byte in data:
xor_value ^= byte
return data + bytes([xor_value])
return data
def validate_numeric_input(text: str, min_val: int, max_val: int, field_name: str) -> Optional[int]:
try:
value = int(text)
if min_val <= value <= max_val:
return value
else:
QMessageBox.warning(None, "输入错误", f"{field_name}必须在{min_val}-{max_val}之间")
return None
except ValueError:
QMessageBox.warning(None, "输入错误", f"{field_name}必须是有效的整数")
return None
def validate_modbus_addr(func_code: int, addr: int) -> bool:
min_addr, max_addr = FUNC_CODE_MAP[func_code][3]
if not (min_addr <= addr <= max_addr):
QMessageBox.warning(None, "地址错误", f"功能码{func_code}的地址范围是{min_addr}-{max_addr}")
return False
return True
def format_value(value: int | bool, mode: str) -> str:
if isinstance(value, bool):
return "1" if value else "0"
if mode == "hex":
return f"0x{value:04X}"
elif mode == "bin":
return f"0b{value:016b}"
else:
return str(value)
def parse_write_values(text: str, func_code: int, count: int) -> Optional[List[int]]:
try:
values = []
text = text.strip()
if not text:
QMessageBox.warning(None, "输入错误", "请输入要写入的值")
return None
parts = [p.strip() for p in text.split(",")]
if func_code in [5, 15]:
for part in parts:
if part.lower() in ["0", "false"]:
values.append(0)
elif part.lower() in ["1", "true"]:
values.append(1)
else:
QMessageBox.warning(None, "输入错误", "线圈值必须是0/1或True/False")
return None
else:
for part in parts:
val = int(part, 0)
if 0 <= val <= 65535:
values.append(val)
else:
QMessageBox.warning(None, "输入错误", "寄存器值必须在0-65535之间")
return None
if len(values) != count and not (func_code in [5, 6] and len(values) == 1 and count == 1):
QMessageBox.warning(None, "输入错误", f"值的数量必须为{count}个")
return None
if func_code in [5, 6] and count > 1:
return values * count
return values
except Exception as e:
QMessageBox.warning(None, "解析错误", f"值解析失败:{str(e)}")
return None
def scan_valid_ports() -> List[str]:
import serial.tools.list_ports
ports = []
for port in serial.tools.list_ports.comports():
ports.append(f"{port.device} - {port.description}")
return ports
# 新增:配置文件处理函数
def save_config(config: Dict, file_path: str) -> bool:
"""保存配置到JSON文件"""
try:
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=4)
return True
except Exception as e:
QMessageBox.warning(None, "保存失败", f"配置保存失败: {str(e)}")
return False
def load_config(file_path: str) -> Optional[Dict]:
"""从JSON文件加载配置"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
QMessageBox.warning(None, "加载失败", f"配置加载失败: {str(e)}")
return None
# 新增:数据记录函数
def write_record_to_file(file_path: str, format_type: str, timestamp: str,
conn_type: str, conn_info: str, data: str, is_send: bool) -> bool:
"""将数据记录写入文件(支持蓝牙/网络/串口)"""
try:
if format_type == "CSV":
# 首次写入需要添加表头
file_exists = False
try:
with open(file_path, 'r') as f:
file_exists = True
except:
pass
with open(file_path, 'a', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["时间戳", "连接类型", "连接信息", "类型", "数据"])
row = [timestamp, conn_type, conn_info, "发送" if is_send else "接收", data]
writer.writerow(row)
elif format_type == "JSON":
record = {
"timestamp": timestamp,
"conn_type": conn_type,
"conn_info": conn_info,
"data_type": "发送" if is_send else "接收",
"data": data
}
# 读取现有数据
records = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
records = json.load(f)
except:
pass
records.append(record)
# 写入更新后的数据
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(records, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
QMessageBox.warning(None, "记录失败", f"数据记录失败: {str(e)}")
return False
# ===================== 线程类 =====================
class SerialReceiveThread(QThread):
receive_signal = pyqtSignal(bytes)
error_signal = pyqtSignal(str)
def __init__(self, serial_port: serial.Serial):
super().__init__()
self.serial = serial_port
self.running = True
def run(self) -> None:
while self.running and self.serial.is_open:
try:
if self.serial.in_waiting:
data = self.serial.read(self.serial.in_waiting)
self.receive_signal.emit(data)
except Exception as e:
self.error_signal.emit(f"接收错误:{str(e)}")
self.running = False
self.msleep(10)
def stop(self) -> None:
self.running = False
class ModbusThread(QThread):
poll_result = pyqtSignal(list, int, int)
write_result = pyqtSignal(bool, str)
status_update = pyqtSignal(str)
log_update = pyqtSignal(str, str)
def __init__(self, serial_port: serial.Serial):
super().__init__()
self.serial = serial_port
self.running = False
self.params = {}
self.operation_type = "read"
self.tx_count = 0
self.err_count = 0
def set_read_params(self, slave_id: int, func_code: int, start_addr: int, count: int, interval: int) -> None:
self.params = {
"slave_id": slave_id,
"func_code": func_code,
"start_addr": start_addr,
"count": count,
"interval": interval
}
self.operation_type = "read"
def set_write_params(self, slave_id: int, func_code: int, start_addr: int, count: int, values: List[int]) -> None:
self.params = {
"slave_id": slave_id,
"func_code": func_code,
"start_addr": start_addr,
"count": count,
"values": values
}
self.operation_type = "write"
def run(self) -> None:
self.running = True
self.tx_count = 0
self.err_count = 0
if self.operation_type == "read":
self._run_read_loop()
else:
self._run_write_operation()
def _run_read_loop(self) -> None:
while self.running and self.serial.is_open:
try:
result = self._perform_read()
if result is not None:
self.poll_result.emit(result, self.tx_count, self.err_count)
except Exception as e:
self.err_count += 1
self.log_update.emit("Modbus-错误", f"读取失败:{str(e)}")
self.poll_result.emit([], self.tx_count, self.err_count)
for _ in range(self.params["interval"] // 10):
if not self.running:
break
self.msleep(10)
def _run_write_operation(self) -> None:
try:
success, msg = self._perform_write()
self.write_result.emit(success, msg)
except Exception as e:
self.write_result.emit(False, f"写入失败:{str(e)}")
def _perform_read(self) -> Optional[List[int | bool]]:
try:
slave_id = self.params["slave_id"]
func_code = self.params["func_code"]
start_addr = self.params["start_addr"]
count = self.params["count"]
addr_bytes = start_addr.to_bytes(2, byteorder='big')
count_bytes = count.to_bytes(2, byteorder='big')
request = bytes([slave_id, func_code]) + addr_bytes + count_bytes
request = _add_checksum(request, 'CRC-16')
self.serial.write(request)
self.tx_count += 1
self.log_update.emit("Modbus-发送", f"请求: {request.hex(' ')}")
self.msleep(100)
if self.serial.in_waiting == 0:
raise Exception("无响应")
response = self.serial.read(self.serial.in_waiting)
self.log_update.emit("Modbus-接收", f"响应: {response.hex(' ')}")
if len(response) < 3:
raise Exception("响应格式错误")
if response[0] != slave_id:
raise Exception(f"从站ID不匹配 (预期: {slave_id}, 实际: {response[0]})")
if response[1] & 0x80:
raise Exception(f"从站错误: 异常码 {response[2]}")
if response[1] != func_code:
raise Exception(f"功能码不匹配 (预期: {func_code}, 实际: {response[1]})")
byte_count = response[2]
if len(response) != byte_count + 3:
raise Exception("响应长度错误")
data_part = response[:-2]
crc_received = response[-2:]
crc_calculated = modbus_crc16(data_part)
if crc_received != crc_calculated:
raise Exception(f"CRC校验失败 (预期: {crc_calculated.hex()}, 实际: {crc_received.hex()})")
data = []
if func_code in [1, 2]:
for i in range(count):
byte_idx = i // 8
bit_idx = i % 8
if byte_idx < byte_count:
data.append(bool((response[3 + byte_idx] >> bit_idx) & 0x01))
elif func_code in [3, 4]:
for i in range(count):
if 3 + i * 2 + 1 < len(response) - 2:
val = (response[3 + i * 2] << 8) | response[3 + i * 2 + 1]
data.append(val)
return data[:count]
except Exception as e:
self.log_update.emit("Modbus-错误", str(e))
return None
def _perform_write(self) -> tuple[bool, str]:
try:
slave_id = self.params["slave_id"]
func_code = self.params["func_code"]
start_addr = self.params["start_addr"]
count = self.params["count"]
values = self.params["values"]
request = bytes([slave_id, func_code])
request += start_addr.to_bytes(2, byteorder='big')
if func_code == 5:
value = 0xFF00 if values[0] else 0x0000
request += value.to_bytes(2, byteorder='big')
elif func_code == 6:
request += values[0].to_bytes(2, byteorder='big')
elif func_code == 15:
request += count.to_bytes(2, byteorder='big')
byte_count = (count + 7) // 8
request += bytes([byte_count])
coil_bytes = bytearray(byte_count)
for i, val in enumerate(values):
byte_idx = i // 8
bit_idx = i % 8
if val:
coil_bytes[byte_idx] |= (1 << bit_idx)
request += coil_bytes
elif func_code == 16:
request += count.to_bytes(2, byteorder='big')
request += bytes([count * 2])
for val in values:
request += val.to_bytes(2, byteorder='big')
request = _add_checksum(request, 'CRC-16')
self.serial.write(request)
self.tx_count += 1
self.log_update.emit("Modbus-发送", f"请求: {request.hex(' ')}")
self.msleep(100)
if self.serial.in_waiting == 0:
raise Exception("无响应")
response = self.serial.read(self.serial.in_waiting)
self.log_update.emit("Modbus-接收", f"响应: {response.hex(' ')}")
if len(response) < 5:
raise Exception("响应格式错误")
if response[0] != slave_id:
raise Exception(f"从站ID不匹配 (预期: {slave_id}, 实际: {response[0]})")
if response[1] & 0x80:
raise Exception(f"从站错误: 异常码 {response[2]}")
if response[1] != func_code:
raise Exception(f"功能码不匹配 (预期: {func_code}, 实际: {response[1]})")
data_part = response[:-2]
crc_received = response[-2:]
crc_calculated = modbus_crc16(data_part)
if crc_received != crc_calculated:
raise Exception(f"CRC校验失败 (预期: {crc_calculated.hex()}, 实际: {crc_received.hex()})")
return True, f"写入成功: 从站{slave_id}, 功能码{func_code}, 地址{start_addr}, 数量{count}"
except Exception as e:
return False, str(e)
def stop(self) -> None:
self.running = False
# 新增:数据记录线程
class DataRecordThread(QThread):
record_signal = pyqtSignal(bool, str)
def __init__(self, file_path: str, format_type: str, interval: int):
super().__init__()
self.file_path = file_path
self.format_type = format_type
self.interval = interval
self.running = False
self.data_buffer = None
def set_data(self, conn_type: str, conn_info: str, data: str, is_send: bool) -> None:
"""设置要记录的数据"""
self.data_buffer = (conn_type, conn_info, data, is_send)
def run(self) -> None:
self.running = True
while self.running:
try:
if self.data_buffer:
conn_type, conn_info, data, is_send = self.data_buffer
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
success = write_record_to_file(
self.file_path,
self.format_type,
timestamp,
conn_type,
conn_info,
data,
is_send
)
if not success:
self.record_signal.emit(False, "数据记录失败")
# 等待指定间隔
for _ in range(self.interval // 10):
if not self.running:
break
self.msleep(10)
except Exception as e:
self.record_signal.emit(False, f"记录线程错误: {str(e)}")
self.running = False
def stop(self) -> None:
self.running = False
# 新增:蓝牙扫描线程
class BluetoothScanThread(QThread):
device_found = pyqtSignal(str, str) # 设备名称, 设备地址
scan_finished = pyqtSignal()
scan_error = pyqtSignal(str)
def __init__(self):
super().__init__()
self.running = False
self.found_devices = set()
def run(self):
self.running = True
self.found_devices.clear()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
scanner = BleakScanner()
scanner.register_detection_callback(self._detection_callback)
loop.run_until_complete(scanner.start())
loop.run_until_complete(asyncio.sleep(5)) # 扫描5秒
loop.run_until_complete(scanner.stop())
except Exception as e:
self.scan_error.emit(str(e))
finally:
loop.close()
self.scan_finished.emit()
def _detection_callback(self, device, advertisement_data):
if self.running and device.address not in self.found_devices:
self.found_devices.add(device.address)
device_name = device.name if device.name else "未知设备"
self.device_found.emit(device_name, device.address)
def stop(self):
self.running = False
# 新增:蓝牙接收线程
class BluetoothReceiveThread(QThread):
data_received = pyqtSignal(bytes)
error_occurred = pyqtSignal(str)
def __init__(self, client: BleakClient, char_uuid: str):
super().__init__()
self.client = client
self.char_uuid = char_uuid
self.running = False
self.loop = asyncio.new_event_loop()
def run(self):
self.running = True
asyncio.set_event_loop(self.loop)
try:
while self.running and self.client.is_connected:
try:
data = self.loop.run_until_complete(self.client.read_gatt_char(self.char_uuid))
if data:
self.data_received.emit(data)
except Exception as e:
self.error_occurred.emit(f"接收错误: {str(e)}")
self.msleep(10)
except Exception as e:
self.error_occurred.emit(f"线程错误: {str(e)}")
finally:
self.loop.close()
def stop(self):
self.running = False
self.loop.stop()
# 新增:网络接收线程(TCP客户端)
class TcpClientReceiveThread(QThread):
data_received = pyqtSignal(bytes)
connection_closed = pyqtSignal()
error_occurred = pyqtSignal(str)
def __init__(self, sock: socket.socket):
super().__init__()
self.sock = sock
self.running = False
def run(self):
self.running = True
self.sock.settimeout(1.0)
while self.running:
try:
data = self.sock.recv(1024)
if not data:
break
self.data_received.emit(data)
except socket.timeout:
continue
except Exception as e:
self.error_occurred.emit(f"接收错误: {str(e)}")
break
self.connection_closed.emit()
def stop(self):
self.running = False
# 新增:TCP服务器线程
class TcpServerThread(QThread):
client_connected = pyqtSignal(socket.socket, Tuple[str, int])
connection_error = pyqtSignal(str)
def __init__(self, host: str, port: int):
super().__init__()
self.host = host
self.port = port
self.running = False
self.server_sock = None
def run(self):
self.running = True
try:
self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_sock.bind((self.host, self.port))
self.server_sock.listen(1)
self.server_sock.settimeout(1.0)
while self.running:
try:
client_sock, addr = self.server_sock.accept()
if self.running:
self.client_connected.emit(client_sock, addr)
else:
client_sock.close()
except socket.timeout:
continue
except Exception as e:
self.connection_error.emit(f"服务器错误: {str(e)}")
break
except Exception as e:
self.connection_error.emit(f"绑定端口失败: {str(e)}")
finally:
if self.server_sock:
self.server_sock.close()
def stop(self):
self.running = False
if self.server_sock:
self.server_sock.close()
# 新增:UDP接收线程
class UdpReceiveThread(QThread):
data_received = pyqtSignal(bytes, Tuple[str, int])
error_occurred = pyqtSignal(str)
def __init__(self, sock: socket.socket):
super().__init__()
self.sock = sock
self.running = False
def run(self):
self.running = True
self.sock.settimeout(1.0)
while self.running:
try:
data, addr = self.sock.recvfrom(1024)
if data:
self.data_received.emit(data, addr)
except socket.timeout:
continue
except Exception as e:
self.error_occurred.emit(f"接收错误: {str(e)}")
break
def stop(self):
self.running = False
# ===================== 主窗口(终极显示优化) =====================
class ModbusRTUMainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("多功能串口调试工具(增强版)")
self.setGeometry(100, 100, 1600, 1200) # 适当增加窗口高度
self.setMinimumSize(1300, 1000)
self._setup_global_style()
# 初始化串口相关
self.serial: serial.Serial = serial.Serial()
self.receive_thread: Optional[SerialReceiveThread] = None
self.modbus_thread: Optional[ModbusThread] = None
# 初始化蓝牙相关
self.bluetooth_client: Optional[BleakClient] = None
self.bluetooth_scan_thread: Optional[BluetoothScanThread] = None
self.bluetooth_receive_thread: Optional[BluetoothReceiveThread] = None
self.bluetooth_devices = {} # {address: name}
self.is_bluetooth_connected = False
self.bluetooth_write_char = BLUETOOTH_CHARACTERISTIC_UUID
# 初始化网络相关
self.network_socket: Optional[socket.socket] = None
self.tcp_server_thread: Optional[TcpServerThread] = None
self.tcp_client_receive_thread: Optional[TcpClientReceiveThread] = None
self.udp_receive_thread: Optional[UdpReceiveThread] = None
self.network_protocol = "TCP客户端"
self.client_socket: Optional[socket.socket] = None
self.client_addr: Optional[Tuple[str, int]] = None
self.is_network_connected = False
# 初始化数据记录相关
self.record_thread: Optional[DataRecordThread] = None
self.is_recording = False
self.record_file_path = ""
self.record_format = "CSV"
self.record_interval = 1000
# 其他初始化
self.loop_send_timer = QTimer()
self.loop_send_timer.timeout.connect(self.send_data)
self.port_scan_timer = QTimer()
self.port_scan_timer.timeout.connect(self.scan_serial_ports)
self.port_scan_timer.start(2000)
self.is_serial_open: bool = False
self.is_loop_sending: bool = False
self.display_mode: str = "dec"
# 字节计数(分类型)
self.serial_rx_bytes = 0
self.serial_tx_bytes = 0
self.bluetooth_rx_bytes = 0
self.bluetooth_tx_bytes = 0
self.network_rx_bytes = 0
self.network_tx_bytes = 0
self._init_ui()
self.activateWindow()
self.raise_()
self.show()
def _setup_global_style(self):
# 全局调色板
palette = QPalette()
palette.setColor(QPalette.Window, QColor(240, 242, 245))
palette.setColor(QPalette.WindowText, QColor(40, 44, 52))
palette.setColor(QPalette.Base, QColor(250, 251, 252))
palette.setColor(QPalette.AlternateBase, QColor(235, 237, 240))
palette.setColor(QPalette.Button, QColor(230, 233, 236))
palette.setColor(QPalette.ButtonText, QColor(40, 44, 52))
palette.setColor(QPalette.Highlight, QColor(76, 145, 226))
palette.setColor(QPalette.HighlightedText, QColor(255, 255, 255))
self.setPalette(palette)
# 样式表(彻底解决文字截断)
self.setStyleSheet("""
QMainWindow {background: #F0F2F5;}
QGroupBox {
font-weight: bold;
font-size: 14px;
border: 1px solid #CED4DA;
border-radius: 8px;
margin-top: 30px; /* 极大增加标题区域的margin,避免内容遮挡标题 */
padding: 30px 20px 20px 20px; /* 增大内边距,让内容远离边框 */
background: #FFFFFF;
}
QGroupBox::title {
subcontrol-origin: margin;
subcontrol-position: top left;
left: 30px; /* 标题左移,避免贴边 */
top: -18px; /* 标题上移,完全脱离内容区域 */
padding: 0 20px; /* 标题左右留足空间,避免文字挤压 */
background: #F0F2F5;
font-family: SimHei;
font-weight: bold;
font-size: 14px;
min-width: 200px; /* 标题强制最小宽度,确保完整显示 */
}
QPushButton {
background: #E8ECEF;
border: 1px solid #CED4DA;
border-radius: 6px;
padding: 10px 20px;
font-size: 12px;
min-height: 36px;
min-width: 130px; /* 按钮强制最小宽度,文字不换行 */
font-family: SimHei;
font-weight: bold;
}
QPushButton:hover {
background: #D8DFE6;
border-color: #ADB5BD;
}
QPushButton:disabled {
background: #F5F7F9;
color: #ADB5BD;
border-color: #E2E6EA;
}
QPushButton#primaryBtn {
background: #4A90E2;
color: white;
border: none;
min-height: 38px;
}
QPushButton#primaryBtn:hover {
background: #357ABD;
}
QLabel {
font-size: 12px;
color: #495057;
min-height: 34px;
min-width: 100px; /* 标签强制最小宽度,中文不截断 */
font-family: SimHei;
font-weight: bold;
}
QTableWidget {
gridline-color: #E9ECEF;
font-family: Consolas;
font-size: 11px;
border: 1px solid #E9ECEF;
border-radius: 6px;
min-height: 250px;
alternate-background-color: #F8F9FA;
font-weight: bold;
}
QTableWidget::item:selected {
background: #E1F0FF;
color: #212529;
}
QTableWidget QHeaderView::section {
background: #E9ECEF;
border: 1px solid #CED4DA;
padding: 10px;
font-weight: bold;
font-size: 12px;
color: #495057;
font-family: SimHei;
min-width: 80px; /* 表头强制最小宽度 */
}
QStatusBar {
font-size: 11px;
color: #495057;
background: #E9ECEF;
border-top: 1px solid #CED4DA;
padding: 2px 15px;
font-family: SimHei;
font-weight: bold;
}
QLineEdit, QComboBox, QTextEdit {
border: 1px solid #CED4DA;
border-radius: 6px;
padding: 8px 12px;
font-size: 12px;
min-height: 36px;
min-width: 120px; /* 输入控件强制最小宽度,文字不截断 */
background: #FFFFFF;
font-family: SimHei;
font-weight: bold;
}
QLineEdit:focus, QComboBox:focus, QTextEdit:focus {
border: 1px solid #4A90E2;
outline: none;
background: #FFFFFF;
}
QComboBox::drop-down {
border-left: 1px solid #CED4DA;
border-radius: 0 6px 6px 0;
}
QComboBox QAbstractItemView {
font-family: SimHei;
font-size: 12px;
font-weight: bold;
padding: 5px;
min-width: 200px; /* 下拉选项强制最小宽度 */
}
QCheckBox {
font-size: 12px;
color: #495057;
min-height: 36px;
min-width: 120px; /* 复选框强制最小宽度,文字不换行 */
spacing: 10px;
font-family: SimHei;
font-weight: bold;
}
QTabWidget::pane {
border: 1px solid #CED4DA;
border-radius: 6px;
background: #FFFFFF;
padding: 10px;
}
QTabBar::tab {
background: #E9ECEF;
border: 1px solid #CED4DA;
border-bottom-color: #CED4DA;
border-radius: 6px 6px 0 0;
padding: 8px 20px;
font-family: SimHei;
font-weight: bold;
margin-right: 2px;
}
QTabBar::tab:selected {
background: #FFFFFF;
border-color: #CED4DA;
border-bottom-color: #FFFFFF;
}
""")
def _init_ui(self) -> None:
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QVBoxLayout(central_widget)
main_layout.setSpacing(25) # 超大布局间距,彻底避免控件挤压
main_layout.setContentsMargins(25, 25, 25, 25)
main_layout.setStretchFactor(central_widget, 1)
# 新增:使用TabWidget组织所有功能区域
main_tab = QTabWidget()
main_tab.setFont(FONT_CONFIG["subtitle"])
# 原有功能标签页
original_tab = QWidget()
original_layout = QVBoxLayout(original_tab)
original_layout.setSpacing(25)
original_layout.setContentsMargins(0, 0, 0, 0)
original_layout.addWidget(self._build_serial_config_widget())
original_layout.addWidget(self._build_modbus_config_widget())
original_layout.addWidget(self._build_io_widget("serial"), stretch=1)
main_tab.addTab(original_tab, "普通串口")
# 新增:蓝牙串口标签页
bluetooth_tab = QWidget()
bluetooth_layout = QVBoxLayout(bluetooth_tab)
bluetooth_layout.setSpacing(25)
bluetooth_layout.setContentsMargins(0, 0, 0, 0)
bluetooth_layout.addWidget(self._build_bluetooth_config_widget())
bluetooth_layout.addWidget(self._build_io_widget("bluetooth"), stretch=1)
main_tab.addTab(bluetooth_tab, "蓝牙串口")
# 新增:网络串口标签页
network_tab = QWidget()
network_layout = QVBoxLayout(network_tab)
network_layout.setSpacing(25)
network_layout.setContentsMargins(0, 0, 0, 0)
network_layout.addWidget(self._build_network_config_widget())
network_layout.addWidget(self._build_io_widget("network"), stretch=1)
main_tab.addTab(network_tab, "网络串口")
# 原有:数据记录标签页
record_tab = QWidget()
record_layout = QVBoxLayout(record_tab)
record_layout.setSpacing(25)
record_layout.setContentsMargins(0, 0, 0, 0)
record_layout.addWidget(self._build_data_record_widget())
main_tab.addTab(record_tab, "数据记录")
# 原有:配置管理标签页
config_tab = QWidget()
config_layout = QVBoxLayout(config_tab)
config_layout.setSpacing(25)
config_layout.setContentsMargins(0, 0, 0, 0)
config_layout.addWidget(self._build_config_management_widget())
main_tab.addTab(config_tab, "配置管理")
main_layout.addWidget(main_tab)
# 状态栏
self.status_bar = QStatusBar()
self.setStatusBar(self.status_bar)
self.status_bar.setFont(FONT_CONFIG["status"])
self.update_status_bar("serial")
# 连接标签页切换信号
main_tab.currentChanged.connect(self.on_tab_changed)
# ===================== 串口配置组 =====================
def _build_serial_config_widget(self) -> QGroupBox:
group = QGroupBox("串口配置 | Serial Configuration")
group.setFont(FONT_CONFIG["title"])
layout = QGridLayout(group)
layout.setHorizontalSpacing(25) # 超大水平间距
layout.setVerticalSpacing(20) # 超大垂直间距
layout.setColumnStretch(1, 3) # 给串口下拉框分配更多宽度
layout.setColumnStretch(3, 2)
# 标签+控件(强制足够宽度)
layout.addWidget(QLabel("串口 Port:"), 0, 0, Qt.AlignRight | Qt.AlignVCenter)
self.serial_combo = QComboBox()
self.serial_combo.setMinimumWidth(300) # 串口下拉框超宽,避免文字截断
self.serial_combo.setFont(FONT_CONFIG["mono"])
self.serial_combo.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
layout.addWidget(self.serial_combo, 0, 1)
layout.addWidget(QLabel("波特率 Baud:"), 0, 2, Qt.AlignRight | Qt.AlignVCenter)
self.baudrate_combo = QComboBox()
self.baudrate_combo.addItems(["9600", "19200", "38400", "57600", "115200", "230400", "460800", "2000000"])
self.baudrate_combo.setCurrentText("9600")
self.baudrate_combo.setMinimumWidth(180) # 波特率下拉框加宽
layout.addWidget(self.baudrate_combo, 0, 3)
layout.addWidget(QLabel("数据位 Data:"), 1, 0, Qt.AlignRight | Qt.AlignVCenter)
self.databits_combo = QComboBox()
self.databits_combo.addItems(["5", "6", "7", "8"])
self.databits_combo.setCurrentText("8")
self.databits_combo.setMinimumWidth(180)
layout.addWidget(self.databits_combo, 1, 1)
layout.addWidget(QLabel("停止位 Stop:"), 1, 2, Qt.AlignRight | Qt.AlignVCenter)
self.stopbits_combo = QComboBox()
self.stopbits_combo.addItems(["1", "1.5", "2"])
self.stopbits_combo.setCurrentText("1")
self.stopbits_combo.setMinimumWidth(180)
layout.addWidget(self.stopbits_combo, 1, 3)
layout.addWidget(QLabel("校验位 Parity:"), 2, 0, Qt.AlignRight | Qt.AlignVCenter)
self.parity_combo = QComboBox()
self.parity_combo.addItems(["无 None", "奇校验 Odd", "偶校验 Even"])
self.parity_combo.setCurrentText("无 None")
self.parity_combo.setMinimumWidth(180)
layout.addWidget(self.parity_combo, 2, 1)
layout.addWidget(QLabel("流控 Flow:"), 2, 2, Qt.AlignRight | Qt.AlignVCenter)
self.rts_cts_check = QCheckBox("RTS/CTS")
self.rts_cts_check.setMinimumHeight(36)
self.rts_cts_check.setMinimumWidth(120)
layout.addWidget(self.rts_cts_check, 2, 3)
# 打开串口按钮(超宽)
self.serial_open_close_btn = QPushButton("打开串口 Open")
self.serial_open_close_btn.setObjectName("primaryBtn")
self.serial_open_close_btn.setMinimumWidth(200)
self.serial_open_close_btn.setMinimumHeight(38)
self.serial_open_close_btn.clicked.connect(lambda: self.toggle_connection("serial"))
layout.addWidget(self.serial_open_close_btn, 0, 4, 3, 1, Qt.AlignCenter)
return group
# 新增:蓝牙配置组
def _build_bluetooth_config_widget(self) -> QGroupBox:
group = QGroupBox("蓝牙设置 | Bluetooth Configuration")
group.setFont(FONT_CONFIG["title"])
layout = QGridLayout(group)
layout.setHorizontalSpacing(25)
layout.setVerticalSpacing(20)
# 蓝牙设备选择
layout.addWidget(QLabel("设备 Device:"), 0, 0, Qt.AlignRight | Qt.AlignVCenter)
self.bluetooth_device_combo = QComboBox()
self.bluetooth_device_combo.setMinimumWidth(300)
self.bluetooth_device_combo.setFont(FONT_CONFIG["mono"])
layout.addWidget(self.bluetooth_device_combo, 0, 1)
# 扫描按钮
self.bluetooth_scan_btn = QPushButton("扫描设备 Scan")
self.bluetooth_scan_btn.setMinimumWidth(150)
self.bluetooth_scan_btn.setMinimumHeight(38)
self.bluetooth_scan_btn.clicked.connect(self.scan_bluetooth_devices)
layout.addWidget(self.bluetooth_scan_btn, 0, 2)
# 连接按钮
self.bluetooth_conn_btn = QPushButton("连接 Connect")
self.bluetooth_conn_btn.setObjectName("primaryBtn")
self.bluetooth_conn_btn.setMinimumWidth(200)
self.bluetooth_conn_btn.setMinimumHeight(38)
self.bluetooth_conn_btn.clicked.connect(lambda: self.toggle_connection("bluetooth"))
layout.addWidget(self.bluetooth_conn_btn, 1, 0, 1, 3, Qt.AlignCenter)
# 连接状态
self.bluetooth_status_label = QLabel("未连接 | Disconnected")
self.bluetooth_status_label.setStyleSheet("color: #E74C3C; font-weight: bold; font-size: 14px;")
layout.addWidget(self.bluetooth_status_label, 2, 0, 1, 3, Qt.AlignCenter)
return group
# 新增:网络配置组
def _build_network_config_widget(self) -> QGroupBox:
group = QGroupBox("网络设置 | Network Configuration")
group.setFont(FONT_CONFIG["title"])
layout = QGridLayout(group)
layout.setHorizontalSpacing(25)
layout.setVerticalSpacing(20)
# 协议选择
layout.addWidget(QLabel("协议 Protocol:"), 0, 0, Qt.AlignRight | Qt.AlignVCenter)
self.network_protocol_combo = QComboBox()
self.network_protocol_combo.addItems(NETWORK_PROTOCOLS)
self.network_protocol_combo.setCurrentText("TCP客户端")
self.network_protocol_combo.setMinimumWidth(180)
self.network_protocol_combo.currentTextChanged.connect(self.on_network_protocol_changed)
layout.addWidget(self.network_protocol_combo, 0, 1)
# IP地址
layout.addWidget(QLabel("IP地址 IP:"), 0, 2, Qt.AlignRight | Qt.AlignVCenter)
self.network_ip_edit = QLineEdit("127.0.0.1")
self.network_ip_edit.setMinimumWidth(180)
layout.addWidget(self.network_ip_edit, 0, 3)
# 端口
layout.addWidget(QLabel("端口 Port:"), 1, 0, Qt.AlignRight | Qt.AlignVCenter)
self.network_port_spin = QSpinBox()
self.network_port_spin.setRange(1, 65535)
self.network_port_spin.setValue(8080)
self.network_port_spin.setMinimumWidth(180)
layout.addWidget(self.network_port_spin, 1, 1)
# 连接按钮
self.network_conn_btn = QPushButton("连接 Connect")
self.network_conn_btn.setObjectName("primaryBtn")
self.network_conn_btn.setMinimumWidth(200)
self.network_conn_btn.setMinimumHeight(38)
self.network_conn_btn.clicked.connect(lambda: self.toggle_connection("network"))
layout.addWidget(self.network_conn_btn, 1, 2, 1, 2, Qt.AlignCenter)
# 连接状态
self.network_status_label = QLabel("未连接 | Disconnected")
self.network_status_label.setStyleSheet("color: #E74C3C; font-weight: bold; font-size: 14px;")
layout.addWidget(self.network_status_label, 2, 0, 1, 4, Qt.AlignCenter)
return group
# ===================== Modbus配置组 =====================
def _build_modbus_config_widget(self) -> QGroupBox:
group = QGroupBox("Modbus 配置 | RTU Master(01/02/03/04/05/06/15/16)")
group.setFont(FONT_CONFIG["title"])
layout = QGridLayout(group)
layout.setHorizontalSpacing(25)
layout.setVerticalSpacing(20)
for col in range(8):
layout.setColumnStretch(col, 1)
self.modbus_status_label = QLabel("Disconnected")
self.modbus_status_label.setStyleSheet("color: #E74C3C; font-weight: bold; font-size: 14px;")
layout.addWidget(self.modbus_status_label, 0, 0, 1, 2)
status_widget = QWidget()
status_layout = QHBoxLayout(status_widget)
status_layout.setSpacing(30) # 状态信息间距加大
status_layout.setContentsMargins(0,0,0,0)
self.tx_label = QLabel("Tx = 0")
self.tx_label.setStyleSheet("color: #2980B9; font-weight: bold; font-size: 12px;")
self.err_label = QLabel("Err = 0")
self.err_label.setStyleSheet("color: #E74C3C; font-weight: bold; font-size: 12px;")
self.id_label = QLabel("ID = 1")
self.f_label = QLabel("F = 03")
self.sr_label = QLabel("SR = 1000ms")
for lbl in [self.tx_label, self.err_label, self.id_label, self.f_label, self.sr_label]:
lbl.setFont(FONT_CONFIG["subtitle"])
lbl.setMinimumHeight(36)
lbl.setMinimumWidth(80) # 状态标签强制宽度
status_layout.addWidget(lbl)
status_layout.addStretch()
layout.addWidget(status_widget, 0, 2, 1, 8)
# 输入项(全量加宽)
layout.addWidget(QLabel("从站地址 ID:"), 1, 0, Qt.AlignRight | Qt.AlignVCenter)
self.slave_id_edit = QLineEdit("1")
self.slave_id_edit.setMinimumWidth(120)
self.slave_id_edit.setMinimumHeight(36)
self.slave_id_edit.setFont(FONT_CONFIG["mono"])
layout.addWidget(self.slave_id_edit, 1, 1)
layout.addWidget(QLabel("功能码 Func:"), 1, 2, Qt.AlignRight | Qt.AlignVCenter)
self.func_code_combo = QComboBox()
self.func_code_combo.setMinimumWidth(250) # 功能码下拉框超宽,容纳长文本
self.func_code_combo.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
for code, (name, _, desc, _) in FUNC_CODE_MAP.items():
self.func_code_combo.addItem(f"{code:02d} {name} ({desc})", code)
self.func_code_combo.currentIndexChanged.connect(self._on_func_code_changed)
layout.addWidget(self.func_code_combo, 1, 3)
layout.addWidget(QLabel("起始地址 Addr:"), 1, 4, Qt.AlignRight | Qt.AlignVCenter)
self.start_addr_edit = QLineEdit("0")
self.start_addr_edit.setMinimumWidth(120)
self.start_addr_edit.setMinimumHeight(36)
self.start_addr_edit.setFont(FONT_CONFIG["mono"])
layout.addWidget(self.start_addr_edit, 1, 5)
layout.addWidget(QLabel("数量 Qty:"), 1, 6, Qt.AlignRight | Qt.AlignVCenter)
self.count_edit = QLineEdit("10")
self.count_edit.setMinimumWidth(120)
self.count_edit.setMinimumHeight(36)
self.count_edit.setFont(FONT_CONFIG["mono"])
layout.addWidget(self.count_edit, 1, 7)
layout.addWidget(QLabel("扫描周期 MS:"), 2, 0, Qt.AlignRight | Qt.AlignVCenter)
self.interval_edit = QLineEdit("1000")
self.interval_edit.setMinimumWidth(140)
self.interval_edit.setMinimumHeight(36)
self.interval_edit.setFont(FONT_CONFIG["mono"])
layout.addWidget(self.interval_edit, 2, 1)
layout.addWidget(QLabel("写操作值 Value:"), 2, 2, Qt.AlignRight | Qt.AlignVCenter)
self.write_value_edit = QLineEdit()
self.write_value_edit.setMinimumWidth(300) # 写操作值输入框超宽
self.write_value_edit.setMinimumHeight(36)
self.write_value_edit.setFont(FONT_CONFIG["mono"])
self.write_value_edit.setPlaceholderText("线圈:0/1/True/False | 寄存器:0-65535 | 多值用逗号分隔")
self.write_value_edit.setEnabled(False)
layout.addWidget(self.write_value_edit, 2, 3, 1, 3)
layout.addWidget(QLabel("显示模式:"), 2, 6, Qt.AlignRight | Qt.AlignVCenter)
self.display_mode_combo = QComboBox()
self.display_mode_combo.setMinimumWidth(150)
self.display_mode_combo.addItems(DISPLAY_MODE_MAP.keys())
self.display_mode_combo.currentTextChanged.connect(self._on_display_mode_changed)
layout.addWidget(self.display_mode_combo, 2, 7)
# 按钮(全量加宽)
self.modbus_btn = QPushButton("开始轮询 Start Poll")
self.modbus_btn.setObjectName("primaryBtn")
self.modbus_btn.setMinimumWidth(200)
self.modbus_btn.setMinimumHeight(38)
self.modbus_btn.clicked.connect(self.toggle_modbus_operation)
self.modbus_btn.setEnabled(False)
layout.addWidget(self.modbus_btn, 3, 0, 1, 2)
self.clear_modbus_btn = QPushButton("清空数据 Clear")
self.clear_modbus_btn.setMinimumWidth(180)
self.clear_modbus_btn.setMinimumHeight(36)
self.clear_modbus_btn.clicked.connect(self.clear_modbus_table)
layout.addWidget(self.clear_modbus_btn, 3, 2, 1, 2)
self.clear_modbus_log_btn = QPushButton("清空日志 Clear Log")
self.clear_modbus_log_btn.setMinimumWidth(180)
self.clear_modbus_log_btn.setMinimumHeight(36)
self.clear_modbus_log_btn.clicked.connect(self.clear_receive)
layout.addWidget(self.clear_modbus_log_btn, 3, 4, 1, 2)
self.manual_write_btn = QPushButton("执行写操作 Write")
self.manual_write_btn.setObjectName("primaryBtn")
self.manual_write_btn.setMinimumWidth(200)
self.manual_write_btn.setMinimumHeight(38)
self.manual_write_btn.clicked.connect(self.execute_manual_write)
self.manual_write_btn.setEnabled(False)
layout.addWidget(self.manual_write_btn, 3, 6, 1, 2)
# 表格(超宽+高)
self.modbus_table = QTableWidget()
self.modbus_table.setColumnCount(2)
self.modbus_table.setHorizontalHeaderLabels(["Address (Dec)", "Value"])
self.modbus_table.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeToContents)
self.modbus_table.horizontalHeader().setSectionResizeMode(1, QHeaderView.Stretch)
self.modbus_table.setRowCount(10)
self.modbus_table.setAlternatingRowColors(True)
self.modbus_table.setMinimumHeight(300) # 表格加高
self.modbus_table.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
self._init_modbus_table()
layout.addWidget(self.modbus_table, 4, 0, 1, 8)
return group
# ===================== 收发组(支持串口/蓝牙/网络) =====================
def _build_io_widget(self, conn_type: str) -> QWidget:
"""
创建数据收发区域
conn_type: serial/bluetooth/network
"""
widget = QWidget()
layout = QHBoxLayout(widget)
layout.setSpacing(25)
layout.setStretch(0, 8) # 接收日志区域占比更大
layout.setStretch(1, 2)
# 接收日志组
receive_group = QGroupBox(f"接收日志 | Receive Log ({conn_type.upper()})")
receive_group.setFont(FONT_CONFIG["title"])
receive_layout = QVBoxLayout(receive_group)
receive_group.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
receive_ctrl = QHBoxLayout()
receive_ctrl.setSpacing(20) # 控件间距加大
# 动态创建控件并存储到实例变量
hex_check_name = f"{conn_type}_hex_receive_check"
timestamp_check_name = f"{conn_type}_timestamp_check"
auto_wrap_check_name = f"{conn_type}_auto_wrap_check"
auto_scroll_check_name = f"{conn_type}_auto_scroll_check"
receive_text_name = f"{conn_type}_receive_text"
clear_receive_btn_name = f"{conn_type}_clear_receive_btn"
byte_count_label_name = f"{conn_type}_byte_count_label"
# 创建接收控件
hex_check = QCheckBox("十六进制显示 HEX")
timestamp_check = QCheckBox("显示时间戳 Time")
timestamp_check.setChecked(True)
auto_wrap_check = QCheckBox("自动换行 Wrap")
auto_wrap_check.setChecked(True)
auto_scroll_check = QCheckBox("自动滚屏 Scroll")
auto_scroll_check.setChecked(True)
# 存储到实例变量
setattr(self, hex_check_name, hex_check)
setattr(self, timestamp_check_name, timestamp_check)
setattr(self, auto_wrap_check_name, auto_wrap_check)
setattr(self, auto_scroll_check_name, auto_scroll_check)
for cb in [hex_check, timestamp_check, auto_wrap_check, auto_scroll_check]:
cb.setMinimumHeight(36)
cb.setMinimumWidth(150) # 复选框强制宽度
receive_ctrl.addWidget(hex_check)
receive_ctrl.addWidget(timestamp_check)
receive_ctrl.addWidget(auto_wrap_check)
receive_ctrl.addWidget(auto_scroll_check)
receive_ctrl.addStretch(1)
# 清空接收按钮
clear_receive_btn = QPushButton("清空接收 Clear")
clear_receive_btn.setMinimumWidth(150)
clear_receive_btn.setMinimumHeight(36)
clear_receive_btn.clicked.connect(lambda: getattr(self, receive_text_name).clear())
setattr(self, clear_receive_btn_name, clear_receive_btn)
receive_ctrl.addWidget(clear_receive_btn)
receive_layout.addLayout(receive_ctrl)
# 接收文本框
receive_text = QTextEdit()
receive_text.setReadOnly(True)
receive_text.setFont(FONT_CONFIG["mono"])
receive_text.setLineWrapMode(QTextEdit.WidgetWidth if auto_wrap_check.isChecked() else QTextEdit.NoWrap)
receive_text.setMinimumHeight(350) # 接收框加高
receive_text.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
auto_wrap_check.clicked.connect(lambda: receive_text.setLineWrapMode(
QTextEdit.WidgetWidth if auto_wrap_check.isChecked() else QTextEdit.NoWrap))
setattr(self, receive_text_name, receive_text)
receive_layout.addWidget(receive_text)
# 字节计数标签
byte_count_label = QLabel("接收: 0 字节 | 发送: 0 字节")
byte_count_label.setAlignment(Qt.AlignRight)
byte_count_label.setFont(FONT_CONFIG["subtitle"])
byte_count_label.setMinimumHeight(36)
byte_count_label.setMinimumWidth(300) # 统计标签加宽
setattr(self, byte_count_label_name, byte_count_label)
receive_layout.addWidget(byte_count_label)
# 发送数据组
send_group = QGroupBox(f"发送数据 | Transmit Data ({conn_type.upper()})")
send_group.setFont(FONT_CONFIG["title"])
send_layout = QVBoxLayout(send_group)
send_group.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
send_ctrl = QHBoxLayout()
send_ctrl.setSpacing(15)
# 动态创建发送控件
hex_send_check_name = f"{conn_type}_hex_send_check"
send_newline_check_name = f"{conn_type}_send_newline_check"
auto_send_check_name = f"{conn_type}_auto_send_check"
send_interval_spin_name = f"{conn_type}_send_interval_spin"
send_btn_name = f"{conn_type}_send_btn"
clear_send_btn_name = f"{conn_type}_clear_send_btn"
send_text_name = f"{conn_type}_send_text"
hex_send_check = QCheckBox("HEX发送")
hex_send_check.setMinimumHeight(36)
send_newline_check = QCheckBox("发送新行")
send_newline_check.setMinimumHeight(36)
auto_send_check = QCheckBox("自动发送")
auto_send_check.setMinimumHeight(36)
setattr(self, hex_send_check_name, hex_send_check)
setattr(self, send_newline_check_name, send_newline_check)
setattr(self, auto_send_check_name, auto_send_check)
send_ctrl.addWidget(hex_send_check)
send_ctrl.addWidget(send_newline_check)
send_ctrl.addWidget(auto_send_check)
send_ctrl.addWidget(QLabel("间隔(ms):"))
send_interval_spin = QSpinBox()
send_interval_spin.setRange(10, 60000)
send_interval_spin.setValue(1000)
send_interval_spin.setEnabled(False)
send_interval_spin.setMinimumWidth(100)
auto_send_check.toggled.connect(lambda checked: send_interval_spin.setEnabled(checked))
setattr(self, send_interval_spin_name, send_interval_spin)
send_ctrl.addWidget(send_interval_spin)
send_layout.addLayout(send_ctrl)
# 发送文本框
send_text = QTextEdit()
send_text.setFont(FONT_CONFIG["mono"])
send_text.setLineWrapMode(QTextEdit.WidgetWidth)
send_text.setMinimumHeight(150)
setattr(self, send_text_name, send_text)
send_layout.addWidget(send_text)
# 发送按钮布局
send_btn_layout = QHBoxLayout()
send_btn = QPushButton("发送 Send")
send_btn.setObjectName("primaryBtn")
send_btn.setMinimumWidth(150)
send_btn.setMinimumHeight(38)
send_btn.clicked.connect(lambda: self.send_data(conn_type))
send_btn.setEnabled(False)
setattr(self, send_btn_name, send_btn)
clear_send_btn = QPushButton("清空 Clear")
clear_send_btn.setMinimumWidth(150)
clear_send_btn.setMinimumHeight(36)
clear_send_btn.clicked.connect(lambda: getattr(self, send_text_name).clear())
setattr(self, clear_send_btn_name, clear_send_btn)
send_btn_layout.addWidget(send_btn)
send_btn_layout.addWidget(clear_send_btn)
send_layout.addLayout(send_btn_layout)
layout.addWidget(receive_group)
layout.addWidget(send_group)
return widget
# ===================== 数据记录功能界面 =====================
def _build_data_record_widget(self) -> QGroupBox:
group = QGroupBox("数据记录配置 | Data Recording")
group.setFont(FONT_CONFIG["title"])
layout = QGridLayout(group)
layout.setHorizontalSpacing(25)
layout.setVerticalSpacing(20)
# 记录文件设置
layout.addWidget(QLabel("记录文件路径:"), 0, 0, Qt.AlignRight | Qt.AlignVCenter)
self.record_file_edit = QLineEdit()
self.record_file_edit.setMinimumWidth(400)
self.record_file_edit.setReadOnly(True)
layout.addWidget(self.record_file_edit, 0, 1)
self.browse_record_btn = QPushButton("浏览...")
self.browse_record_btn.setMinimumWidth(100)
self.browse_record_btn.clicked.connect(self.browse_record_file)
layout.addWidget(self.browse_record_btn, 0, 2)
# 记录格式设置
layout.addWidget(QLabel("记录格式:"), 1, 0, Qt.AlignRight | Qt.AlignVCenter)
self.record_format_combo = QComboBox()
self.record_format_combo.addItems(RECORD_FORMAT_OPTIONS)
self.record_format_combo.setCurrentText(self.record_format)
self.record_format_combo.currentTextChanged.connect(self._on_record_format_changed)
layout.addWidget(self.record_format_combo, 1, 1)
# 记录间隔设置
layout.addWidget(QLabel("记录间隔(ms):"), 1, 2, Qt.AlignRight | Qt.AlignVCenter)
self.record_interval_edit = QLineEdit(str(self.record_interval))
self.record_interval_edit.setMinimumWidth(150)
layout.addWidget(self.record_interval_edit, 1, 3)
# 记录状态和控制
layout.addWidget(QLabel("记录状态:"), 2, 0, Qt.AlignRight | Qt.AlignVCenter)
self.record_status_label = QLabel(RECORD_STATUS["STOPPED"])
self.record_status_label.setStyleSheet("color: #E74C3C; font-weight: bold; font-size: 14px;")
layout.addWidget(self.record_status_label, 2, 1)
self.record_control_btn = QPushButton("开始记录")
self.record_control_btn.setObjectName("primaryBtn")
self.record_control
|
免费评分
-
查看全部评分
|