好友
阅读权限10
听众
最后登录1970-1-1
|
import ctypes
import time
import sys
import json
import os
import requests
import datetime
from PyQt6.QtWidgets import (
QApplication, QMainWindow, QLabel, QVBoxLayout, QWidget,
QMessageBox, QPushButton, QLayout, QShortcut
)
from PyQt6.QtCore import QTimer, Qt, QThread, pyqtSignal
from PyQt6.QtGui import QFont, QGuiApplication, QPixmap, QKeySequence
class DataFetcher(QThread):
data_updated = pyqtSignal(list)
error_occurred = pyqtSignal(str)
def __init__(self, stock_codes, interval=3):
super().__init__()
self.stock_codes = stock_codes
self.interval = interval
self.run_flag = True
def get_market_prefix(self, code):
"""完善股票代码市场前缀匹配"""
code = str(code).strip()
if code.startswith(('6', '5')):
return f"sh{code}" # 沪市
if code.startswith(('0', '3', '1')):
return f"sz{code}" # 深市(补充1开头)
if code.startswith(('8', '4', '9')):
return f"bj{code}" # 北交所
return code
def run(self):
query_codes = [self.get_market_prefix(c) for c in self.stock_codes]
url = f"http://hq.sinajs.cn/list={','.join(query_codes)}"
# 优化请求头,提升接口兼容性
headers = {
'Referer': 'https://finance.sina.com.cn/stock/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36 Edg/124.0.0.0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'
}
while self.run_flag:
try:
response = requests.get(url, headers=headers, timeout=8)
# 自动识别编码,替代强制GBK
response.encoding = response.apparent_encoding
lines = response.text.strip().split('\n')
data = []
for line in lines:
if not line or '=' not in line or not line.startswith('var hq_str_'):
continue
parts = line.split('=')
if len(parts) == 2:
stock_code = parts[0][-6:]
info_str = parts[1].strip('";')
if not info_str:
continue
info = info_str.split(',')
if len(info) > 3:
try:
name = info[0]
pre_close = float(info[2])
current = float(info[3])
if current == 0 and pre_close != 0:
current = pre_close
pct = 0.0
if pre_close > 0:
pct = (current - pre_close) / pre_close * 100
data.append({
'代码': stock_code,
'名称': name,
'现价': current,
'涨幅': round(pct, 2)
})
except (ValueError, IndexError):
continue
if data:
self.data_updated.emit(data)
else:
self.error_occurred.emit("数据被拦截或解析为空")
except requests.exceptions.Timeout:
self.error_occurred.emit("网络请求超时,请检查网络")
except requests.exceptions.ConnectionError:
self.error_occurred.emit("网络连接失败,请检查网络")
except Exception as e:
self.error_occurred.emit(f"网络异常: {str(e)[:45]}")
self.msleep(self.interval * 1000)
def stop(self):
"""安全停止线程"""
self.run_flag = False
self.wait()
class HotkeyWindow(QWidget):
"""支持鼠标拖拽、双击截图、边界适配的独立窗口"""
def __init__(self, position_callback=None):
super().__init__()
self.position_callback = position_callback
self.drag_start_position = None
# 优化窗口标志位,提升跨系统兼容性
self.setWindowFlags(
Qt.WindowType.Tool |
Qt.WindowType.FramelessWindowHint |
Qt.WindowType.WindowStaysOnTopHint |
Qt.WindowType.X11BypassWindowManagerHint
)
self.resize(220, 100)
self.setStyleSheet("background-color: #f5f5f5; border: 1px solid #cccccc;")
self.layout = QVBoxLayout(self)
self.layout.setContentsMargins(15, 10, 15, 10)
self.label = QLabel("")
font = QFont("Microsoft YaHei", 10, QFont.Weight.Bold)
self.label.setFont(font)
self.label.setAlignment(Qt.AlignmentFlag.AlignTop | Qt.AlignmentFlag.AlignLeft)
self.label.setTextFormat(Qt.TextFormat.RichText)
self.label.setAttribute(Qt.WidgetAttribute.WA_TransparentForMouseEvents)
self.layout.addWidget(self.label)
# 修复退出按钮位置(原230超出窗口宽度)
self.close_btn = QPushButton("X", self)
self.close_btn.setGeometry(195, 5, 20, 20)
self.close_btn.setStyleSheet("""
QPushButton { border: none; font-weight: bold; color: #999999; font-size: 12px; }
QPushButton:hover { color: red; background-color: #eeeeee; border-radius: 10px; }
""")
self.close_btn.setCursor(Qt.CursorShape.PointingHandCursor)
self.close_btn.clicked.connect(QApplication.instance().quit)
def update_content(self, html_text, needed_height):
self.label.setText(html_text)
if needed_height != self.height():
self.resize(self.width(), needed_height)
def mousePressEvent(self, event):
if event.button() == Qt.MouseButton.LeftButton:
self.drag_start_position = event.globalPosition().toPoint() - self.frameGeometry().topLeft()
event.accept()
def mouseMoveEvent(self, event):
if event.buttons() == Qt.MouseButton.LeftButton and self.drag_start_position is not None:
self.move(event.globalPosition().toPoint() - self.drag_start_position)
event.accept()
def mouseReleaseEvent(self, event):
if event.button() == Qt.MouseButton.LeftButton:
self.drag_start_position = None
if self.position_callback:
self.position_callback(self.x(), self.y())
event.accept()
def mouseDoubleClickEvent(self, event):
"""双击截图并提示"""
if event.button() == Qt.MouseButton.LeftButton:
pixmap = self.grab()
clipboard = QApplication.clipboard()
clipboard.clear()
clipboard.setPixmap(pixmap)
# 短暂提示截图成功
tip_label = QLabel("截图已复制", self)
tip_label.setStyleSheet("color: #666666; background-color: #ffffff; padding: 2px 8px; border-radius: 4px;")
tip_label.move(int(self.width()/2 - 30), int(self.height()/2 - 10))
tip_label.show()
QTimer.singleShot(1000, tip_label.hide)
event.accept()
class StealthMonitor(QMainWindow):
def __init__(self):
super().__init__()
self.memory_data = {
"html_view": "正在连接数据...",
"marquee_view": "初始化中... ",
"needed_height": 100,
"has_valid_data": False
}
self.scroll_index = 0
self.last_show_time = 0
self.hk_window_visible = False # 标记窗口状态
self.load_config()
self.hk_window = HotkeyWindow(position_callback=self.save_new_position)
self.init_ui()
interval = self.config.get('refresh_interval', 3)
self.fetcher = DataFetcher(self.config['stocks'], interval)
self.fetcher.data_updated.connect(self.update_memory_and_ui, Qt.ConnectionType.QueuedConnection)
self.fetcher.error_occurred.connect(self.handle_error, Qt.ConnectionType.QueuedConnection)
self.fetcher.start()
# 优化定时器间隔,降低CPU占用
self.scroll_timer = QTimer(self)
self.scroll_timer.timeout.connect(self.scroll_title)
self.scroll_timer.start(400)
# 注册Qt原生热键(替代keyboard库)
try:
# 解析热键(支持 "alt+meta" 对应 win键)
key_seq = QKeySequence(self.hotkey.replace("win", "meta"))
self.hotkey_shortcut = QShortcut(key_seq, self)
self.hotkey_shortcut.activated.connect(self.toggle_hotkey_window)
except Exception as e:
self.handle_error(f"热键注册失败: {str(e)}")
def load_config(self):
"""加载配置,完善容错逻辑"""
config_path = 'config.json'
default_config = {
"stocks": ["601698", "600408", "300001", "001280"],
"refresh_interval": 3,
"hotkey": "alt+meta", # Qt原生热键,meta对应win键
"position_x": 1110,
"position_y": 1190
}
if not os.path.exists(config_path):
try:
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(default_config, f, indent=4, ensure_ascii=False)
except Exception as e:
QMessageBox.warning(None, "警告", f"创建配置文件失败: {str(e)}")
try:
with open(config_path, 'r', encoding='utf-8') as f:
self.config = json.load(f)
except Exception:
self.config = default_config
# 配置项容错
for key in default_config.keys():
self.config.setdefault(key, default_config[key])
self.hotkey = self.config["hotkey"]
self.pos_x = self.config["position_x"]
self.pos_y = self.config["position_y"]
def save_new_position(self, x, y):
"""保存窗口位置,增加边界校验"""
screen_rect = QGuiApplication.primaryScreen().availableGeometry()
# 确保坐标在屏幕范围内
self.pos_x = max(screen_rect.left(), min(x, screen_rect.right() - 220))
self.pos_y = max(screen_rect.top(), min(y, screen_rect.bottom() - 100))
self.config["position_x"] = self.pos_x
self.config["position_y"] = self.pos_y
try:
with open('config.json', 'w', encoding='utf-8') as f:
json.dump(self.config, f, indent=4, ensure_ascii=False)
except Exception as e:
pass
def init_ui(self):
self.setWindowFlags(Qt.WindowType.FramelessWindowHint | Qt.WindowType.WindowStaysOnBottomHint)
self.setWindowOpacity(0.01)
self.resize(220, 180)
self.central_widget = QWidget()
self.central_widget.setStyleSheet("background-color: #f5f5f5;")
self.setCentralWidget(self.central_widget)
self.layout = QVBoxLayout(self.central_widget)
self.layout.setContentsMargins(15, 10, 15, 10)
self.label = QLabel(self.memory_data["html_view"])
font = QFont("Microsoft YaHei", 10, QFont.Weight.Bold)
self.label.setFont(font)
self.label.setAlignment(Qt.AlignmentFlag.AlignTop | Qt.AlignmentFlag.AlignLeft)
self.label.setTextFormat(Qt.TextFormat.RichText)
self.layout.addWidget(self.label)
def update_memory_and_ui(self, data):
preview_lines = []
marquee_parts = []
now = datetime.datetime.now()
time_str = f"{now.year}.{now.month}.{now.day} {now.strftime('%H:%M:%S')}"
timestamp_html = f'<span style="color:#888888; font-size:11px;">{time_str}</span>'
for stock in data:
pct = stock['涨幅']
symbol = "+" if pct > 0 else ""
marquee_parts.append(f"{stock['名称']} {stock['现价']:.3f} ({symbol}{pct}%)")
if pct > 0:
color = "red"
elif pct < 0:
color = "green"
else:
color = "black"
line_text = f"{stock['代码']} {stock['名称']} {stock['现价']:.3f} ({symbol}{pct}%)"
html_line = f'<span style="color:{color};">{line_text}</span>'
preview_lines.append(html_line)
content_body = "<br><br>".join(preview_lines)
self.memory_data["html_view"] = f"{timestamp_html}<br><br>{content_body}"
self.memory_data["marquee_view"] = " | ".join(marquee_parts) + " | "
self.memory_data["needed_height"] = 55 + len(data) * 35
self.memory_data["has_valid_data"] = True
self.label.setText(self.memory_data["html_view"])
self.resize(self.width(), self.memory_data["needed_height"])
self.repaint()
if self.hk_window.isVisible():
self.hk_window.update_content(self.memory_data["html_view"], self.memory_data["needed_height"])
def handle_error(self, err_msg):
if not self.memory_data["has_valid_data"]:
err_html = f'<span style="color:red; font-size:12px;">{err_msg}</span>'
self.memory_data["html_view"] = err_html
self.label.setText(err_html)
self.repaint()
def scroll_title(self):
marquee = self.memory_data["marquee_view"]
if not marquee or len(marquee) <= 1:
return
if self.scroll_index >= len(marquee):
self.scroll_index = 0
display_text = marquee[self.scroll_index:] + marquee[:self.scroll_index]
self.setWindowTitle(display_text)
self.scroll_index = (self.scroll_index + 1) % len(marquee)
def toggle_hotkey_window(self):
"""切换热键窗口显示/隐藏(替代原check_hotkey)"""
current_time = time.time()
if current_time - self.last_show_time > 0.2: # 防抖
if not self.hk_window_visible:
self.show_hotkey_window()
else:
self.hk_window.hide()
self.hk_window_visible = False
self.last_show_time = current_time
def show_hotkey_window(self):
"""优化窗口位置计算,确保在屏幕内"""
self.hk_window.update_content(
self.memory_data["html_view"],
self.memory_data["needed_height"]
)
screen_rect = QGuiApplication.primaryScreen().availableGeometry()
w = self.hk_window.width()
h = self.hk_window.height()
# 解析位置配置,增加异常处理
try:
if str(self.pos_x).lower() == "left":
x = screen_rect.left()
elif str(self.pos_x).lower() == "right":
x = screen_rect.right() - w
else:
x = int(self.pos_x)
# 确保x在屏幕范围内
x = max(screen_rect.left(), min(x, screen_rect.right() - w))
except (ValueError, TypeError):
x = screen_rect.right() - w # 非法值默认右对齐
try:
if str(self.pos_y).lower() == "bottom":
y = screen_rect.bottom() - h
elif str(self.pos_y).lower() == "top":
y = screen_rect.top()
else:
y = int(self.pos_y)
# 确保y在屏幕范围内
y = max(screen_rect.top(), min(y, screen_rect.bottom() - h))
except (ValueError, TypeError):
y = screen_rect.bottom() - h # 非法值默认底对齐
self.hk_window.setGeometry(x, y, w, h)
self.hk_window.show()
self.hk_window_visible = True
def closeEvent(self, event):
"""程序退出时安全停止线程"""
self.fetcher.stop()
event.accept()
if __name__ == "__main__":
# 全局异常捕获,定位闪退原因
import traceback
def excepthook(exc_type, exc_value, exc_tb):
error_msg = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb))
# 保存错误日志到文件
with open('crash_log.txt', 'w', encoding='utf-8') as f:
f.write(f"崩溃时间: {datetime.datetime.now()}\n")
f.write(error_msg)
# 弹窗提示
msg_box = QMessageBox()
msg_box.setIcon(QMessageBox.Icon.Critical)
msg_box.setWindowTitle("程序崩溃")
msg_box.setText(f"错误信息:{str(exc_value)[:100]}\n详情见 crash_log.txt")
msg_box.exec()
sys.exit(1)
sys.excepthook = excepthook
# 解决高DPI显示问题
QApplication.setHighDpiScaleFactorRoundingPolicy(Qt.HighDpiScaleFactorRoundingPolicy.PassThrough)
QApplication.setAttribute(Qt.ApplicationAttribute.AA_EnableHighDpiScaling)
QApplication.setAttribute(Qt.ApplicationAttribute.AA_UseHighDpiPixmaps)
app = QApplication(sys.argv)
# 单实例校验
mutex_name = "yqy_gupiao"
mutex = ctypes.windll.kernel32.CreateMutexW(None, False, mutex_name)
if ctypes.windll.kernel32.GetLastError() == 183:
msg_box = QMessageBox()
msg_box.setIcon(QMessageBox.Icon.Warning)
msg_box.setWindowTitle("提示")
msg_box.setText("程序已运行!")
flags = (
Qt.WindowType.Dialog |
Qt.WindowType.MSWindowsFixedSizeDialogHint |
Qt.WindowType.CustomizeWindowHint |
Qt.WindowType.WindowTitleHint |
Qt.WindowType.WindowCloseButtonHint
)
msg_box.setWindowFlags(flags)
msg_box.layout().setSizeConstraint(QLayout.SizeConstraint.SetFixedSize)
msg_box.exec()
sys.exit(0)
main_window = StealthMonitor()
main_window.show()
exit_code = app.exec()
# 退出前停止线程
main_window.fetcher.stop()
sys.exit(exit_code) |
|