[Python] 纯文本查看 复制代码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
NSFW监控工具
===============
功能:实时监控NSFW内容,保护系统安全
作者:菜羽凡
版本:v2.0
创建时间:2025-01-11
使用说明:
- 直接运行: python main.py
- 环境设置: python main.py --setup-only
- 创建可执行文件: python main.py --create-exe
主要特性:
- 跨平台Python虚拟环境管理
- 自动镜像库下载(支持Git和ZIP)
- 依赖自动安装
- PyInstaller打包
- 分发包生成
"""
print("准备导入TensorFlow和nsfwpy...")
try:
import tensorflow as tf
tf_version = getattr(tf, '__version__', '未知版本')
print(f"TensorFlow版本: {tf_version}")
except Exception as e:
print(f"TensorFlow导入时出错: {e}")
print("尝试继续加载其他依赖...")
# 尝试导入nsfwpy
try:
from nsfwpy import NSFW
print("nsfwpy库导入成功")
except Exception as e:
print(f"nsfwpy导入时出错: {e}")
print("警告: nsfwpy库未正确安装,程序可能无法正常工作")
import os
import sys
import shutil
import zipfile
import argparse
import subprocess
import time
import json
import hashlib
import logging
from pathlib import Path
from datetime import datetime
from urllib.request import urlretrieve
from urllib.parse import urlparse
import tempfile
import ctypes
# GUI和监控相关模块
import threading
import queue
import tkinter as tk
from tkinter import messagebox
import tkinter.scrolledtext as scrolledtext
import tkinter.ttk as ttk
# 图像处理相关
from PIL import Image, ImageTk
import numpy as np
# Windows特定模块(用于系统托盘和高级功能)
try:
import win32gui
import win32con
import win32api
import psutil
WINDOWS_AVAILABLE = True
except ImportError:
WINDOWS_AVAILABLE = False
win32gui = None
win32con = None
win32api = None
psutil = None
# 添加pip更新功能
def update_pip():
"""更新pip到最新版本"""
try:
print("正在更新pip到最新版本...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "--upgrade", "pip"])
print("pip更新成功")
except Exception as e:
print(f"pip更新失败: {e}")
# 程序启动时更新pip
update_pip()
# 设置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Windows API常量定义
HWND_TOPMOST = -1
HWND_NOTOPMOST = -2
SWP_NOMOVE = 0x0002
SWP_NOSIZE = 0x0001
SWP_SHOWWINDOW = 0x0040
# 全局配置
CONFIG = {
'monitor_paths': [
os.path.join(os.path.expanduser('~'), 'Pictures', 'Screenshots'),
os.path.join(os.path.expanduser('~'), 'Desktop'),
os.path.join(os.path.expanduser('~'), 'Downloads')
],
'detection_threshold': 0.17,
'check_interval': 2.0, # 检查间隔(秒)- 修改为2秒
'enable_screenshot_monitoring': True,
'enable_realtime_monitoring': True,
'auto_delete_suspicious': False,
'show_enhanced_warnings': True,
'max_alerts_per_minute': 5,
'temp_dir': tempfile.gettempdir() + os.sep + 'nsfw_monitor_temp',
'auto_screenshot_interval': 2.0, # 每2秒自动截图
'enable_auto_screenshot': True # 启用自动截图功能
}
# 自动创建安装脚本
AUTOMATION_SCRIPT = """@echo off
chcp 65001 >nul
echo NSFW监控工具 - 自动化设置脚本
echo ================================
echo 正在激活虚拟环境...
call venv\\Scripts\\activate.bat
echo 正在启动NSFW监控工具...
python main.py --setup-only
echo 设置完成!
pause
"""
def create_automation_script():
"""创建自动化安装脚本"""
try:
script_path = os.path.join(os.getcwd(), "自动化安装.bat")
with open(script_path, 'w', encoding='utf-8') as f:
f.write(AUTOMATION_SCRIPT)
logger.info(f"自动化脚本已创建: {script_path}")
return script_path
except Exception as e:
logger.error(f"创建自动化脚本失败: {e}")
return None
# 模块加载时自动创建脚本
if not os.path.exists("自动化安装.bat"):
create_automation_script()
class FileMonitor:
"""文件监控器 - 监控指定目录中的新文件"""
def __init__(self, detector, config=None):
self.detector = detector
self.config = config or CONFIG
self.monitored_paths = self.config['monitor_paths']
self.running = False
self.file_queue = queue.Queue()
self.processed_files = set()
self.detection_results = []
self.alert_queue = queue.Queue()
def start_monitoring(self):
"""开始监控"""
self.running = True
logger.info(f"开始监控目录: {self.monitored_paths}")
# 启动文件处理线程
threading.Thread(target=self._process_files, daemon=True).start()
# 启动监控线程
threading.Thread(target=self._monitor_files, daemon=True).start()
# 启动警告显示线程
threading.Thread(target=self._handle_alerts, daemon=True).start()
logger.info("文件监控已启动")
def stop_monitoring(self):
"""停止监控"""
self.running = False
logger.info("文件监控已停止")
def _monitor_files(self):
"""监控文件变化"""
while self.running:
try:
for path in self.monitored_paths:
if os.path.exists(path):
self._scan_directory(path)
time.sleep(self.config['check_interval'])
except Exception as e:
logger.error(f"文件监控错误: {e}")
def _scan_directory(self, directory):
"""扫描目录中的新文件"""
try:
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
# 只处理图像文件
if self._is_image_file(filename) and file_path not in self.processed_files:
# 检查文件是否完整(避免处理正在写入的文件)
if self._is_file_complete(file_path):
self.processed_files.add(file_path)
self.file_queue.put(file_path)
except Exception as e:
logger.error(f"扫描目录失败 {directory}: {e}")
def _is_image_file(self, filename):
"""判断是否为图像文件"""
image_extensions = {'.png', '.jpg', '.jpeg', '.bmp', '.gif', '.tiff', '.webp'}
return Path(filename).suffix.lower() in image_extensions
def _is_file_complete(self, file_path):
"""检查文件是否完整"""
try:
# 等待文件大小稳定
size1 = os.path.getsize(file_path)
time.sleep(0.1)
size2 = os.path.getsize(file_path)
return size1 == size2 and size1 > 0
except:
return False
def _process_files(self):
"""处理文件队列"""
while self.running:
try:
file_path = self.file_queue.get(timeout=1.0)
logger.info(f"检测文件: {file_path}")
# NSFW检测
results = self.detector.predict_image(file_path)
# 判断是否需要警告
if self._should_alert(results):
alert_info = {
'file_path': file_path,
'results': results,
'timestamp': datetime.now()
}
self.alert_queue.put(alert_info)
# 记录检测结果
self.detection_results.append({
'file': file_path,
'results': results,
'timestamp': datetime.now()
})
except queue.Empty:
continue
except Exception as e:
logger.error(f"文件处理错误: {e}")
def _should_alert(self, results):
"""判断是否需要警告"""
# 检查高风险类别 - 处理嵌套details结构和字符串格式
high_risk_categories = ['porn', 'hentai', 'sexy']
threshold = self.config['detection_threshold']
# 优先从details中获取结果(新版格式)
if 'details' in results and isinstance(results['details'], dict):
category_results = results['details']
else:
category_results = results
logger.info(f"文件监控检查警告逻辑,结果来源: {list(category_results.keys())}")
for category in high_risk_categories:
score = category_results.get(category, 0)
logger.info(f"检查文件 {category} 分类,原始值: {score} (类型: {type(score)})")
# 确保是浮点数格式
try:
if isinstance(score, str):
score = float(score)
elif isinstance(score, (int, float)):
score = float(score)
else:
score = 0.0
except (ValueError, TypeError):
score = 0.0
logger.info(f"文件 {category} 转换后分数: {score} > {threshold} ? {score > threshold}")
if score > threshold:
logger.info(f"文件分数达到阈值!{category}: {score} > {threshold},将触发弹窗")
return True
logger.info("文件分数未达到阈值,没有弹窗")
return False
def _handle_alerts(self):
"""处理警告"""
while self.running:
try:
alert_info = self.alert_queue.get(timeout=1.0)
self._show_alert(alert_info)
except queue.Empty:
continue
except Exception as e:
logger.error(f"警告处理错误: {e}")
def log_message(self, message):
"""记录日志消息"""
logger.info(message)
def show_alert(self):
"""显示警告对话框"""
# 构建警告文本
alert_text = f"""敏感内容终将堙灭!
检测阈值: {self.config['detection_threshold']}
监控路径: {', '.join(self.config['monitor_paths'])}
建议:立即检查相关文件内容"""
self.log_message("显示警告对话框: " + alert_text)
try:
# 获取当前活动窗口句柄
hwnd = ctypes.windll.user32.GetActiveWindow()
if hwnd == 0:
hwnd = ctypes.windll.user32.GetForegroundWindow()
if hwnd == 0:
hwnd = ctypes.windll.user32.GetDesktopWindow()
# 确保弹窗在最前面并立即激活
ctypes.windll.user32.SetForegroundWindow(hwnd)
ctypes.windll.user32.ShowWindow(hwnd, 9) # SW_RESTORE
# 使用Windows API显示悬浮窗置顶的警告对话框
# 增强的置顶标志组合:
# 0x40 (MB_ICONINFORMATION) - 信息图标
# 0x1 (MB_OK) - 确定按钮
# 0x1000 (MB_DEFAULT_DESKTOP_ONLY) - 仅默认桌面
# 0x00000004 (MB_SYSTEMMODAL) - 系统模态(置顶)
# 0x00000010 (MB_SETFOREGROUND) - 设置为前台窗口
# 0x00000080 (MB_TOPMOST) - 始终在顶部
# 0x00001000 (MB_RIGHT) - 右对齐文本
# 0x00000020 (MB_RTLREADING) - 从右到左阅读
flags = 0x40 | 0x1 | 0x1000 | 0x00000004 | 0x00000010 | 0x00000080 | 0x00001000
self.log_message(f"正在显示置顶弹窗,目标窗口句柄: {hwnd}")
result = ctypes.windll.user32.MessageBoxW(
hwnd,
alert_text,
"NSFW内容检测警告",
flags
)
self.log_message(f"警告对话框已显示并关闭,用户点击结果: {result}")
except Exception as e:
self.log_message(f"显示Windows消息框失败: {e}")
# 备选方案:使用系统托盘方式显示
try:
ctypes.windll.user32.MessageBoxW(
0,
alert_text,
"NSFW内容检测警告",
0x40 | 0x1 | 0x1000 | 0x00000004 | 0x00000010 | 0x00000080
)
except Exception as fallback_e:
self.log_message(f"备选弹窗也失败: {fallback_e}")
def show_alert_detailed(self):
"""显示详细检测结果的警告对话框"""
# 构建详细检测结果文本
results = getattr(self, 'detection_results', {})
file_path = getattr(self, 'current_file_path', '')
# 构建简化分数信息
porn_score = results.get('porn', 0)
sexy_score = results.get('sexy', 0)
hentai_score = results.get('hentai', 0)
neutral_score = results.get('neutral', 0)
# 构建风险等级信息
if porn_score > 0.3 or sexy_score > 0.3 or hentai_score > 0.3:
risk_level = "高风险 - 检测到NSFW内容"
elif porn_score > 0.1 or sexy_score > 0.1 or hentai_score > 0.1:
risk_level = "中风险 - 检测到可疑内容"
else:
risk_level = "低风险"
alert_text = f"检测结果:\n"
alert_text += f"色情: {porn_score:.1%}\n"
alert_text += f"性感: {sexy_score:.1%}\n"
alert_text += f"动漫色情: {hentai_score:.1%}\n"
alert_text += f"正常: {neutral_score:.1%}\n"
alert_text += f"\n风险等级: {risk_level}\n"
alert_text += f"文件路径: {file_path}"
self.log_message("显示警告对话框: " + alert_text)
try:
# 获取当前活动窗口句柄并确保置顶
hwnd = ctypes.windll.user32.GetActiveWindow()
if hwnd == 0:
hwnd = ctypes.windll.user32.GetForegroundWindow()
if hwnd == 0:
hwnd = ctypes.windll.user32.GetDesktopWindow()
# 确保弹窗在最前面
ctypes.windll.user32.SetForegroundWindow(hwnd)
ctypes.windll.user32.ShowWindow(hwnd, 9) # SW_RESTORE
# 增强的置顶标志
flags = 0x40 | 0x1 | 0x1000 | 0x00000004 | 0x00000010 | 0x00000080
self.log_message(f"正在显示详细置顶弹窗,目标窗口句柄: {hwnd}")
result = ctypes.windll.user32.MessageBoxW(
hwnd,
alert_text,
"NSFW内容检测警告",
flags
)
self.log_message(f"警告对话框已显示并关闭,用户点击结果: {result}")
except Exception as e:
self.log_message(f"显示Windows消息框失败: {e}")
# 备选方案
try:
ctypes.windll.user32.MessageBoxW(
0,
alert_text,
"NSFW内容检测警告",
0x40 | 0x1 | 0x1000 | 0x00000004 | 0x00000010 | 0x00000080
)
except Exception as fallback_e:
self.log_message(f"备选弹窗也失败: {fallback_e}")
def create_image(self):
"""创建托盘图标"""
try:
image = Image.new('RGB', (64, 64), color='red')
draw = ImageDraw.Draw(image)
draw.text((10, 25), "NSFW", fill='white')
print("已创建托盘图标")
return image
except Exception as e:
print(f"创建图标失败: {e}")
# 返回一个简单的默认图像
return Image.new('RGB', (64, 64), color='red')
def _show_alert(self, alert_info):
"""显示警告窗口"""
try:
# 获取检测结果和文件路径
file_path = alert_info['file_path']
results = alert_info['results']
# 记录详细的检测结果
file_name = os.path.basename(file_path)
self.log_message(f"检测到NSFW内容 - 文件: {file_name}")
# 记录每个类别的检测结果
for category, score in results.items():
self.log_message(f"检测结果 - {category}: {score:.3f}")
self.log_message(f"文件路径: {file_path}")
# 构建详细警告文本并显示弹窗
self._show_detailed_alert(results, file_path)
except Exception as e:
logger.error(f"显示警告窗口失败: {e}")
def _show_detailed_alert(self, results, file_path):
"""显示带详细检测结果的警告弹窗"""
try:
# 构建简化分数信息
porn_score = results.get('porn', 0)
sexy_score = results.get('sexy', 0)
hentai_score = results.get('hentai', 0)
neutral_score = results.get('neutral', 0)
# 构建风险等级信息
if porn_score > 0.3 or sexy_score > 0.3 or hentai_score > 0.3:
risk_level = "高风险 - 检测到NSFW内容"
elif porn_score > 0.1 or sexy_score > 0.1 or hentai_score > 0.1:
risk_level = "中风险 - 检测到可疑内容"
else:
risk_level = "低风险"
# 构建详细警告文本
alert_text = f"""🚨 NSFW内容检测警告 🚨
检测结果:
• 色情内容: {porn_score:.1%}
• 性感内容: {sexy_score:.1%}
• 动漫色情: {hentai_score:.1%}
• 正常内容: {neutral_score:.1%}
🔴 风险等级: {risk_level}
📁 检测文件: {os.path.basename(file_path)}
📂 文件路径: {file_path}
⚠️ 建议立即检查并处理此文件内容"""
self.log_message("显示详细警告对话框: " + alert_text.replace('\n', ' '))
# 获取当前活动窗口句柄并应用置顶逻辑
hwnd = ctypes.windll.user32.GetActiveWindow()
if hwnd == 0:
hwnd = ctypes.windll.user32.GetForegroundWindow()
if hwnd == 0:
hwnd = ctypes.windll.user32.GetDesktopWindow()
# 确保弹窗在最前面并立即激活
ctypes.windll.user32.SetForegroundWindow(hwnd)
ctypes.windll.user32.ShowWindow(hwnd, 9) # SW_RESTORE
# 增强的置顶标志组合(与show_alert保持一致)
flags = 0x40 | 0x1 | 0x1000 | 0x00000004 | 0x00000010 | 0x00000080 | 0x00001000
self.log_message(f"正在显示详细置顶弹窗,目标窗口句柄: {hwnd}")
# 使用Windows API显示置顶警告对话框
result = ctypes.windll.user32.MessageBoxW(
hwnd,
alert_text,
"NSFW内容检测警告",
flags
)
self.log_message(f"详细警告对话框已显示,用户点击结果: {result}")
except Exception as e:
# 如果详细弹窗失败,使用简单弹窗作为备选
self.log_message(f"显示详细警告失败,使用简单弹窗: {e}")
try:
fallback_text = f"""NSFW内容检测警告
检测到敏感内容,请立即检查:
{file_path}
检测阈值: {self.config['detection_threshold']}"""
# 备选弹窗也应用置顶逻辑
hwnd_fallback = ctypes.windll.user32.GetActiveWindow()
if hwnd_fallback == 0:
hwnd_fallback = ctypes.windll.user32.GetForegroundWindow()
if hwnd_fallback == 0:
hwnd_fallback = ctypes.windll.user32.GetDesktopWindow()
ctypes.windll.user32.MessageBoxW(
hwnd_fallback,
fallback_text,
"NSFW内容检测警告",
0x40 | 0x1 | 0x1000 | 0x00000004 | 0x00000010 | 0x00000080 | 0x00001000
)
self.log_message("备选弹窗已显示")
except Exception as fallback_e:
self.log_message(f"备选弹窗也失败: {fallback_e}")
class ScreenshotMonitor:
"""实时截图监控器 - 监控系统截图和剪贴板图像"""
def __init__(self, detector, config=None):
self.detector = detector
self.config = config or CONFIG
self.running = False
self.previous_clipboard = None
self.screenshot_count = 0
self.last_screenshot_time = 0
self.temp_dir = tempfile.mkdtemp()
# 防重复扫描:记录已扫描的文件路径和时间戳
self.scanned_files = {} # {文件路径: 上次扫描时间戳}
self.scanned_file_lifetime = 300 # 文件在记录中保存5分钟(秒)
def _should_scan_file(self, file_path):
"""检查文件是否需要扫描(防重复扫描)"""
try:
# 获取文件修改时间戳
file_mtime = os.path.getmtime(file_path)
# 如果文件不在扫描记录中,或者文件已被修改
if file_path not in self.scanned_files:
self.scanned_files[file_path] = file_mtime
return True
elif self.scanned_files[file_path] != file_mtime:
# 文件已修改,重新扫描
self.scanned_files[file_path] = file_mtime
return True
else:
# 文件未修改,跳过扫描
logger.debug(f"跳过已扫描文件: {os.path.basename(file_path)}")
return False
except Exception as e:
logger.error(f"检查文件扫描状态失败: {e}")
return True
def _cleanup_old_scanned_files(self):
"""清理过期的扫描记录"""
try:
current_time = time.time()
expired_files = []
for file_path, last_modified in self.scanned_files.items():
# 检查文件是否仍然存在
if not os.path.exists(file_path):
expired_files.append(file_path)
# 检查记录是否过期(保存时间超过scanned_file_lifetime)
elif current_time - os.path.getmtime(file_path) > self.scanned_file_lifetime:
expired_files.append(file_path)
# 删除过期记录
for file_path in expired_files:
self.scanned_files.pop(file_path, None)
except Exception as e:
logger.error(f"清理扫描记录失败: {e}")
def start_monitoring(self):
"""开始监控"""
self.running = True
logger.info("实时截图监控已启动")
# 启动多个监控线程
threading.Thread(target=self._monitor_clipboard_images, daemon=True).start()
threading.Thread(target=self._monitor_screen_captures, daemon=True).start()
threading.Thread(target=self._monitor_windows_activity, daemon=True).start()
# 启动自动截图监控(每2秒一次)
if self.config.get('enable_auto_screenshot', True):
threading.Thread(target=self._auto_screenshot_monitor, daemon=True).start()
logger.info("自动截图监控已启动(每2秒截图检测)")
def stop_monitoring(self):
"""停止监控"""
self.running = False
logger.info("截图监控已停止")
def _monitor_clipboard_images(self):
"""监控剪贴板中的图像"""
while self.running:
try:
self._check_clipboard_for_images()
time.sleep(1.0) # 每秒检查一次
except Exception as e:
logger.error(f"剪贴板图像监控错误: {e}")
def _check_clipboard_for_images(self):
"""检查剪贴板中的图像数据"""
try:
import tkinter as tk
from PIL import ImageGrab
# 尝试获取剪贴板中的图像
root = tk.Tk()
root.withdraw()
try:
# 检查剪贴板是否有图像数据
image_data = root.clipboard_get()
root.destroy()
# 如果剪贴板内容改变
if image_data != self.previous_clipboard:
self.previous_clipboard = image_data
# 尝试获取实际的图像
try:
# 使用PIL获取剪贴板图像
screenshot = ImageGrab.grabclipboard()
if screenshot is not None:
# 保存临时截图文件
timestamp = int(time.time() * 1000)
temp_filename = f"clipboard_screenshot_{timestamp}.png"
temp_path = os.path.join(self.temp_dir, temp_filename)
screenshot.save(temp_path, "PNG")
logger.info(f"检测到剪贴板截图: {temp_filename}")
# 检测截图内容
self._analyze_screenshot(temp_path, "剪贴板截图")
# 清理临时文件
threading.Timer(10.0, lambda: self._cleanup_temp_file(temp_path)).start()
except Exception as e:
logger.error(f"获取剪贴板图像失败: {e}")
except tk.TclError:
# 剪贴板中没有文本数据
root.destroy()
except Exception as e:
logger.error(f"剪贴板检查失败: {e}")
def _monitor_screen_captures(self):
"""监控屏幕捕获活动"""
# 使用Windows API监控截图工具
if os.name == 'nt':
self._monitor_windows_screenshots()
else:
self._monitor_unix_screenshots()
def _monitor_windows_screenshots(self):
"""Windows截图监控"""
try:
import win32gui
import win32con
import psutil
# 监控的截图程序
screenshot_apps = [
"SnippingTool.exe", # Windows 10/11自带截图工具
"SnippingTool32.exe", # 32位版本
"ScreenSketch.exe", # 新版截图工具
"obs64.exe", # OBS Studio(常用于录屏)
"lightshot.exe", # Lightshot
"greenshot.exe" # GreenShot
]
known_processes = set()
while self.running:
try:
current_processes = set()
# 检查运行的进程
for proc in psutil.process_iter(['pid', 'name']):
try:
process_name = proc.info['name'].lower()
if any(app.lower() in process_name for app in screenshot_apps):
current_processes.add(proc.info['pid'])
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# 检测新启动的截图程序
new_processes = current_processes - known_processes
if new_processes:
for pid in new_processes:
logger.info(f"检测到截图程序启动: PID {pid}")
# 稍等片刻让截图程序完全启动
threading.Timer(2.0, lambda: self._wait_for_screenshot(pid)).start()
known_processes = current_processes
time.sleep(2.0) # 每2秒检查一次
except Exception as e:
logger.error(f"Windows截图监控错误: {e}")
time.sleep(5.0)
except ImportError:
logger.warning("win32gui不可用,使用基础屏幕监控")
# 使用定期屏幕捕获作为备用方案
threading.Thread(target=self._periodic_screen_capture, daemon=True).start()
def _wait_for_screenshot(self, pid):
"""等待截图完成"""
try:
# 等待3秒让用户完成截图
time.sleep(3.0)
# 尝试获取屏幕截图
self._capture_and_analyze_screen()
except Exception as e:
logger.error(f"等待截图失败: {e}")
def _capture_and_analyze_screen(self):
"""捕获当前屏幕并进行检测"""
try:
from PIL import ImageGrab
# 截取当前屏幕
screenshot = ImageGrab.grab()
# 保存临时截图
timestamp = int(time.time() * 1000)
temp_filename = f"screen_capture_{timestamp}.png"
temp_path = os.path.join(self.temp_dir, temp_filename)
screenshot.save(temp_path, "PNG")
# 防重复扫描检查
if not self._should_scan_file(temp_path):
os.remove(temp_path) # 删除临时文件,因为没有扫描
return
logger.info(f"捕获并分析屏幕截图: {temp_filename}")
# 分析截图内容
self._analyze_screenshot(temp_path, "屏幕捕获")
# 清理过期记录
self._cleanup_old_scanned_files()
# 清理临时文件
threading.Timer(30.0, lambda: self._cleanup_temp_file(temp_path)).start()
except Exception as e:
logger.error(f"屏幕捕获失败: {e}")
def _auto_screenshot_monitor(self):
"""自动截图监控 - 每2秒截取并检测当前屏幕"""
logger.info("开始自动截图监控(每2秒)")
while self.running:
try:
# 每2秒截取当前屏幕
self._capture_and_analyze_screen()
time.sleep(self.config.get('auto_screenshot_interval', 2.0))
except Exception as e:
logger.error(f"自动截图监控错误: {e}")
time.sleep(2.0)
def _periodic_screen_capture(self):
"""定期屏幕捕获(备用方案)"""
while self.running:
try:
current_time = time.time()
# 每5秒检查一次是否有新的截图内容
if current_time - self.last_screenshot_time > 5.0:
self._capture_and_analyze_screen()
self.last_screenshot_time = current_time
time.sleep(2.0)
except Exception as e:
logger.error(f"定期屏幕捕获错误: {e}")
time.sleep(5.0)
def _monitor_windows_activity(self):
"""监控Windows活动窗口变化"""
if os.name != 'nt':
return
try:
import win32gui
import win32con
last_window = None
while self.running:
try:
# 获取当前活动窗口
current_window = win32gui.GetForegroundWindow()
if current_window != last_window:
last_window = current_window
window_title = win32gui.GetWindowText(current_window)
# 检查是否是截图相关窗口
if any(keyword in window_title.lower() for keyword in
['截图', 'snip', 'capture', 'shot']):
logger.info(f"检测到截图窗口: {window_title}")
threading.Timer(1.0, self._capture_and_analyze_screen).start()
time.sleep(1.0)
except Exception as e:
logger.error(f"窗口监控错误: {e}")
time.sleep(5.0)
except ImportError:
logger.warning("win32gui不可用,跳过窗口活动监控")
def _monitor_unix_screenshots(self):
"""Unix/Linux/macOS截图监控"""
# 基础实现 - 定期捕获屏幕
while self.running:
try:
# 检查是否有截图相关的进程
result = subprocess.run(['pgrep', '-f', 'screenshot|shot|capture'],
capture_output=True, text=True)
if result.returncode == 0:
logger.info("检测到截图进程")
threading.Timer(2.0, self._capture_and_analyze_screen).start()
time.sleep(3.0)
except Exception as e:
logger.error(f"Unix截图监控错误: {e}")
time.sleep(5.0)
def _analyze_screenshot(self, image_path, source):
"""分析截图内容"""
try:
logger.info(f"开始分析截图: {os.path.basename(image_path)}")
# NSFW检测
results = self.detector.predict_image(image_path)
# 记录检测结果
self.screenshot_count += 1
detection_record = {
'source': source,
'file_path': image_path,
'results': results,
'timestamp': datetime.now(),
'count': self.screenshot_count
}
logger.info(f"截图检测结果 [{source}]: {results}")
# 判断是否需要警告
if self._should_alert(results):
self._show_screenshot_alert(detection_record)
except Exception as e:
logger.error(f"截图分析失败: {e}")
def _should_alert(self, results):
"""判断是否需要警告"""
# 检查高风险类别 - 处理嵌套details结构和字符串格式
high_risk_categories = ['porn', 'hentai', 'sexy']
threshold = self.config['detection_threshold']
# 优先从details中获取结果(新版格式)
if 'details' in results and isinstance(results['details'], dict):
category_results = results['details']
else:
category_results = results
logger.info(f"检查警告逻辑,结果来源: {list(category_results.keys())}")
for category in high_risk_categories:
score = category_results.get(category, 0)
logger.info(f"检查 {category} 分类,原始值: {score} (类型: {type(score)})")
# 确保是浮点数格式
try:
if isinstance(score, str):
score = float(score)
elif isinstance(score, (int, float)):
score = float(score)
else:
score = 0.0
except (ValueError, TypeError):
score = 0.0
logger.info(f"{category} 转换后分数: {score} > {threshold} ? {score > threshold}")
if score > threshold:
logger.info(f"分数达到阈值!{category}: {score} > {threshold},将触发弹窗")
return True
logger.info("分数未达到阈值,没有弹窗")
return False
def log_message(self, message):
"""记录日志"""
try:
import datetime
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}"
print(log_entry)
logger.info(log_entry) if 'logger' in globals() else None
except Exception as e:
print(f"记录日志失败: {e}")
def show_alert(self):
"""显示警告对话框"""
# 构建简化分数信息
results = self.config.get('results', {})
porn_score = results.get('porn', 0)
sexy_score = results.get('sexy', 0)
neutral_score = results.get('neutral', 0)
# 构建风险等级信息
if porn_score > 0.3 or sexy_score > 0.3:
risk_level = "高风险 - 检测到NSFW内容"
elif porn_score > 0.1 or sexy_score > 0.1:
risk_level = "中风险 - 检测到可疑内容"
else:
risk_level = "低风险"
alert_text = f"检测结果:\n"
alert_text += f"色情: {porn_score:.1%}\n"
alert_text += f"性感: {sexy_score:.1%}\n"
alert_text += f"正常: {neutral_score:.1%}\n"
alert_text += f"\n风险等级: {risk_level}"
self.log_message("显示警告对话框: " + alert_text)
try:
# 使用Windows API显示悬浮窗置顶的警告对话框
# 获取当前活动窗口句柄
hwnd = ctypes.windll.user32.GetForegroundWindow()
logger.info(f"show_alert - 当前活动窗口句柄: {hwnd}")
# 如果获取失败,尝试获取桌面窗口
if not hwnd or hwnd == 0:
hwnd = ctypes.windll.user32.GetDesktopWindow()
logger.info(f"使用桌面窗口句柄: {hwnd}")
# 设置窗口到前台
if hwnd and hwnd != 0:
ctypes.windll.user32.SetForegroundWindow(hwnd)
# 确保窗口可见
ctypes.windll.user32.ShowWindow(hwnd, 9) # SW_RESTORE
# 使用增强的标志组合确保弹窗置顶
flags = 0x40 | 0x1 | 0x1000 | 0x00000004 | 0x00000010 | 0x00000080 | 0x00001000 # MB_TOPMOST | MB_DEFAULT_DESKTOP_ONLY
logger.info(f"show_alert - 使用标志: 0x{flags:X}")
result = ctypes.windll.user32.MessageBoxW(
hwnd,
alert_text,
"NSFW内容检测警告",
flags
)
self.log_message(f"警告对话框已显示并关闭,用户点击结果: {result}")
except Exception as e:
self.log_message(f"显示Windows消息框失败: {e}")
def create_image(self):
"""创建托盘图标"""
try:
image = Image.new('RGB', (64, 64), color='red')
draw = ImageDraw.Draw(image)
draw.text((10, 25), "NSFW", fill='white')
print("已创建托盘图标")
return image
except Exception as e:
print(f"创建图标失败: {e}")
# 返回一个简单的默认图像
return Image.new('RGB', (64, 64), color='red')
def _show_screenshot_alert(self, detection_record):
"""显示截图警告 - 使用Windows API显示详细检测结果"""
try:
# 构建结果信息
results = detection_record['results']
file_path = detection_record['file_path']
source = detection_record['source']
# 记录详细的检测结果
self.log_message(f"检测到NSFW截图 - 来源: {source}")
# 记录每个类别的检测结果 - 安全处理
for category, score in results.items():
if isinstance(score, dict):
# details是嵌套字典,跳过或特殊处理
self.log_message(f"截图检测结果 - {category}: {dict(score)}")
elif isinstance(score, (int, float)):
# 数值类型可以格式化
self.log_message(f"截图检测结果 - {category}: {score:.3f}")
else:
# 其他类型直接显示
self.log_message(f"截图检测结果 - {category}: {score}")
self.log_message(f"截图文件路径: {file_path}")
# 显示详细的Windows API警告对话框
self._show_detailed_screenshot_alert(results, file_path, source)
logger.info(f"截图详细警告已显示: {source}")
except Exception as e:
logger.error(f"显示截图警告失败: {e}")
def _show_detailed_screenshot_alert(self, results, file_path, source):
"""显示带详细检测结果的截图警告弹窗"""
try:
# 优先从details中获取结果(新版格式)
if 'details' in results and isinstance(results['details'], dict):
category_results = results['details']
else:
category_results = results
# 构建简化分数信息 - 安全处理字符串格式
def safe_float(value, default=0.0):
try:
if isinstance(value, str):
return float(value)
elif isinstance(value, (int, float)):
return float(value)
else:
return default
except (ValueError, TypeError):
return default
porn_score = safe_float(category_results.get('porn', 0))
sexy_score = safe_float(category_results.get('sexy', 0))
hentai_score = safe_float(category_results.get('hentai', 0))
neutral_score = safe_float(category_results.get('neutral', 0))
# 构建风险等级信息
if porn_score > 0.3 or sexy_score > 0.3 or hentai_score > 0.3:
risk_level = "高风险 - 检测到NSFW内容"
elif porn_score > 0.1 or sexy_score > 0.1 or hentai_score > 0.1:
risk_level = "中风险 - 检测到可疑内容"
else:
risk_level = "低风险"
# 构建详细警告文本
alert_text = f"""📸 NSFW截图检测警告 📸
检测结果:
• 色情内容: {porn_score:.1%}
• 性感内容: {sexy_score:.1%}
• 动漫色情: {hentai_score:.1%}
• 正常内容: {neutral_score:.1%}
🔴 风险等级: {risk_level}
📷 检测来源: {source}
📁 文件名称: {os.path.basename(file_path)}
⚠️ 建议立即检查并处理此截图内容"""
self.log_message("显示截图详细警告对话框: " + alert_text.replace('\n', ' '))
# 使用Windows API显示置顶警告对话框
# 获取当前活动窗口句柄
hwnd = ctypes.windll.user32.GetForegroundWindow()
logger.info(f"截图详细警告 - 当前活动窗口句柄: {hwnd}")
# 如果获取失败,尝试获取桌面窗口
if not hwnd or hwnd == 0:
hwnd = ctypes.windll.user32.GetDesktopWindow()
logger.info(f"使用桌面窗口句柄: {hwnd}")
# 设置窗口到前台
if hwnd and hwnd != 0:
ctypes.windll.user32.SetForegroundWindow(hwnd)
# 确保窗口可见
ctypes.windll.user32.ShowWindow(hwnd, 9) # SW_RESTORE
# 使用增强的标志组合确保弹窗置顶
flags = 0x40 | 0x1 | 0x1000 | 0x00000004 | 0x00000010 | 0x00000080 | 0x00001000 # MB_TOPMOST | MB_DEFAULT_DESKTOP_ONLY
logger.info(f"截图详细警告 - 使用标志: 0x{flags:X}")
result = ctypes.windll.user32.MessageBoxW(
hwnd,
alert_text,
"NSFW截图检测警告",
flags
)
self.log_message(f"截图详细警告对话框已显示,用户点击结果: {result}")
except Exception as e:
# 如果详细弹窗失败,使用简单弹窗作为备选
self.log_message(f"显示截图详细警告失败,使用简单弹窗: {e}")
try:
fallback_text = f"""NSFW截图检测警告
检测到敏感截图内容,请立即检查。
检测来源: {source}
检测阈值: {self.config['detection_threshold']}
建议立即检查并处理此截图内容"""
# 备选弹窗也应用相同的置顶逻辑
hwnd = ctypes.windll.user32.GetForegroundWindow()
if not hwnd or hwnd == 0:
hwnd = ctypes.windll.user32.GetDesktopWindow()
if hwnd and hwnd != 0:
ctypes.windll.user32.SetForegroundWindow(hwnd)
ctypes.windll.user32.ShowWindow(hwnd, 9)
fallback_flags = 0x40 | 0x1 | 0x1000 | 0x00000004 | 0x00000010 | 0x00000080 | 0x00001000
logger.info(f"截图备选弹窗 - 使用标志: 0x{fallback_flags:X}")
ctypes.windll.user32.MessageBoxW(
hwnd,
fallback_text,
"NSFW截图检测警告",
fallback_flags
)
self.log_message("截图备选弹窗已显示")
except Exception as fallback_e:
self.log_message(f"截图备选弹窗也失败: {fallback_e}")
def _cleanup_temp_file(self, file_path):
"""清理临时文件"""
try:
if os.path.exists(file_path):
os.remove(file_path)
logger.debug(f"已清理临时文件: {os.path.basename(file_path)}")
except Exception as e:
logger.error(f"清理临时文件失败: {e}")
def get_stats(self):
"""获取监控统计信息"""
return {
'total_screenshots': self.screenshot_count,
'monitoring_active': self.running,
'temp_dir': self.temp_dir,
'scanned_files_count': len(self.scanned_files),
'detection_threshold': self.config['detection_threshold'],
'auto_screenshot_enabled': self.config.get('enable_auto_screenshot', True),
'auto_screenshot_interval': self.config.get('auto_screenshot_interval', 2.0)
}
class Config:
"""配置管理类"""
def __init__(self):
self.project_dir = os.path.dirname(os.path.abspath(__file__))
self.venv_dir = os.path.join(self.project_dir, "venv")
self.mirror_repo_dir = os.path.join(self.project_dir, "mirror_repo")
self.dist_package_dir = os.path.join(self.project_dir, "dist_package")
self.logs_dir = os.path.join(self.project_dir, "logs")
# 确保目录存在
for directory in [self.logs_dir]:
os.makedirs(directory, exist_ok=True)
# 默认镜像库URL
self.default_mirror_url = "https://github.com/GantMan/nsfw_model/archive/refs/heads/master.zip"
class NSFWDetector:
"""NSFW检测器 - 仅使用真实模型"""
def __init__(self):
print("初始化NSFW检测器(使用真实模型)...")
self.detector = None
self.tf_available = False
self.nsfwpy_available = False
# 检查依赖是否可用
try:
import tensorflow as tf
self.tf_available = True
except:
pass
try:
from nsfwpy import NSFW
self.nsfwpy_available = True
except:
pass
if self.tf_available and self.nsfwpy_available:
self.initialize_detector()
else:
print("警告: 缺少必要依赖,NSFW检测功能将不可用")
if not self.tf_available:
print(" - TensorFlow未正确安装")
if not self.nsfwpy_available:
print(" - nsfwpy未正确安装")
def initialize_detector(self):
"""初始化真实的NSFW模型"""
try:
print("正在加载nsfwpy模型...")
# 仅在TensorFlow可用时尝试GPU配置
if self.tf_available:
import tensorflow as tf
# 限制TensorFlow内存使用
try:
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
try:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
print("已设置GPU内存增长模式")
except RuntimeError as e:
print(f"GPU内存配置失败: {e}")
except Exception as e:
print(f"TensorFlow GPU配置时出错: {e}")
# 仅在nsfwpy可用时尝试初始化
if self.nsfwpy_available:
from nsfwpy import NSFW
self.detector = NSFW()
print("NSFW模型加载成功")
except Exception as e:
print(f"NSFW模型加载失败: {e}")
import traceback
traceback.print_exc()
self.detector = None
def capture_screen(self):
"""捕获整个屏幕"""
try:
print("正在捕获屏幕...")
image = ImageGrab.grab()
print(f"成功捕获屏幕,尺寸: {image.size}")
return image
except Exception as e:
print(f"屏幕捕获失败: {e}")
return None
def capture_active_window(self):
"""捕获活动窗口"""
try:
print("正在捕获活动窗口...")
# 简化版本,仍然捕获整个屏幕
image = ImageGrab.grab()
print(f"成功捕获活动窗口区域,尺寸: {image.size}")
return image
except Exception as e:
print(f"活动窗口捕获失败: {e}")
return None
def predict_image(self, image_path):
"""预测单个图像内容"""
try:
print("执行NSFW检测...")
if not image_path:
print("没有图像可检测")
return None
# 检查依赖是否可用
if not self.tf_available or not self.nsfwpy_available:
print("警告: 缺少必要依赖,无法执行NSFW检测")
# 返回安全的默认值,避免程序崩溃
return {
'safe': 0.0,
'nsfw': 0.0,
'details': {"error": "缺少TensorFlow或nsfwpy依赖"}
}
if not self.detector:
print("警告: 真实模型未加载,尝试重新初始化...")
self.initialize_detector()
if not self.detector:
print("错误: 无法加载NSFW模型,检测失败")
# 返回安全的默认值,避免程序崩溃
return {
'safe': 0.0,
'nsfw': 0.0,
'details': {"error": "无法初始化NSFW模型"}
}
print("使用真实NSFW模型进行检测...")
start_time = time.time()
# 优化图像处理以减少内存使用
max_size = 1024 # 限制图像最大尺寸以减少内存消耗
image = Image.open(image_path)
if max(image.size) > max_size:
print(f"图像尺寸过大 ({image.size}),调整大小...")
image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
print(f"调整后的图像尺寸: {image.size}")
# 使用真实模型检测
result = self.detector.predict_pil_image(image)
end_time = time.time()
print(f"检测完成,耗时: {end_time - start_time:.2f}秒")
print(f"原始检测结果类型: {type(result)}")
# 增强防御性编程,处理可能的元组或其他数据类型
nsfw_score = 0.0
safe_score = 0.0
# 检查result是否为字典类型
if isinstance(result, dict):
print(f"检测结果是字典,键: {list(result.keys())}")
# 使用get方法安全访问字典键,设置默认值
try:
nsfw_score = (float(result.get('porn', 0)) +
float(result.get('hentai', 0)) +
float(result.get('sexy', 0)))
safe_score = (
float(result.get('drawings', 0)) +
float(result.get('neutral', 0))
)
except (ValueError, TypeError) as e:
print(f"字典值转换失败: {e}")
nsfw_score = 0.0
safe_score = 1.0
else:
# 如果不是字典,使用默认值
print(f"检测结果不是预期格式,使用默认值")
nsfw_score = 0.0
safe_score = 1.0
# 构建标准化的返回结果
normalized_result = {
'nsfw': nsfw_score,
'safe': safe_score,
'details': result if isinstance(result, dict) else {},
'processing_time': end_time - start_time
}
print(f"NSFW检测结果: NSFW={nsfw_score:.3f}, Safe={safe_score:.3f}")
return normalized_result
except Exception as e:
print(f"NSFW检测过程中发生错误: {e}")
import traceback
traceback.print_exc()
# 返回安全的默认值
return {
'nsfw': 0.0,
'safe': 1.0,
'details': {"error": str(e)},
'processing_time': 0.0
}
def _heuristic_detection(self, image):
"""基于图像特征的启发式检测 - 修复算法"""
try:
import numpy as np
import cv2
# 转换为numpy数组
img_array = np.array(image)
# 计算基本统计特征
mean_color = np.mean(img_array, axis=(0, 1))
color_variance = np.var(img_array, axis=(0, 1))
brightness = np.mean(img_array)
contrast = np.std(img_array)
# 改进的启发式规则 - 平衡默认值
results = {
'drawings': 0.0, # 默认零值
'hentai': 0.0, # 默认零值
'neutral': 0.6, # 基准值
'porn': 0.0, # 默认零值
'sexy': 0.0 # 默认零值
}
# 重新设计特征检测逻辑
# 亮度分析 - 正常图像应该有适当的亮度
if 100 <= brightness <= 200: # 正常亮度范围
results['neutral'] += 0.3 # 增强正常图像的得分
elif brightness > 200: # 很亮的图像
results['neutral'] += 0.1 # 轻度增强
results['drawings'] += 0.1 # 可能是漫画或白底图
elif brightness < 100: # 很暗的图像
results['porn'] += 0.2 # 暗图像可能包含敏感内容
results['hentai'] += 0.1
# 对比度分析
if contrast < 30: # 低对比度 - 可能是单调的正常图像
results['neutral'] += 0.2
elif contrast > 60: # 高对比度 - 可能是敏感内容
results['sexy'] += 0.15
results['porn'] += 0.1
# 颜色分布分析 - 肤色检测
# 更严格的肤色检测
img_hsv = cv2.cvtColor(img_array, cv2.COLOR_RGB2HSV) if len(img_array.shape) == 3 else None
if img_hsv is not None:
# HSV肤色范围检测
lower_skin = np.array([0, 20, 70], dtype=np.uint8)
upper_skin = np.array([20, 255, 255], dtype=np.uint8)
skin_mask = cv2.inRange(img_hsv, lower_skin, upper_skin)
skin_ratio = np.sum(skin_mask > 0) / (img_array.shape[0] * img_array.shape[1])
if skin_ratio > 0.15: # 肤色比例超过15%
results['sexy'] += 0.3 # 显著增加性感得分
results['porn'] += 0.2
elif skin_ratio > 0.08: # 轻度肤色
results['sexy'] += 0.15
results['neutral'] -= 0.1 # 轻度降低正常得分
else:
# 简单的RGB肤色检测作为备选
skin_colors = np.array([255, 220, 177]) # 近似肤色
skin_distance = np.min(np.linalg.norm(img_array - skin_colors, axis=2))
if skin_distance < 40: # 检测到肤色
results['sexy'] += 0.2
results['porn'] += 0.1
# 图像内容复杂度分析
gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) if len(img_array.shape) == 3 else img_array
edge_density = np.sum(cv2.Canny(gray, 50, 150) > 0) / (gray.shape[0] * gray.shape[1])
if edge_density < 0.1: # 边缘很少 - 可能是纯色背景
results['neutral'] += 0.2 # 正常图像
elif edge_density > 0.3: # 边缘丰富 - 可能是复杂图像
results['drawings'] += 0.1 # 可能是漫画或复杂图形
results['neutral'] += 0.1
# 确保所有值非负
for key in results:
results[key] = max(results[key], 0.0)
# 归一化结果 - 确保总和为1
total = sum(results.values())
if total > 0:
for key in results:
results[key] = results[key] / total
else:
# 如果所有分数都为0,设置为默认分布
results = {'drawings': 0.0, 'hentai': 0.0, 'neutral': 1.0, 'porn': 0.0, 'sexy': 0.0}
logger.info(f"启发式检测结果: {results}")
return results
except Exception as e:
logger.error(f"启发式检测失败: {e}")
# 出错时返回更合理的默认值
return {'drawings': 0.0, 'hentai': 0.0, 'neutral': 0.8, 'porn': 0.1, 'sexy': 0.1}
def initialize(self, model_path=None):
"""初始化检测器"""
try:
# 检查依赖
self._check_dependencies()
# 加载模型
if model_path:
self.model_path = model_path
else:
# 使用默认模型路径
default_path = os.path.join("mirror_repo", "nsfw_model")
if os.path.exists(default_path):
self.model_path = default_path
else:
# 创建本地模型结构
self._create_local_model_structure()
self.model_path = default_path
# 这里加载实际的模型
# self.model = load_model(self.model_path) # 示例代码
self.is_initialized = True
logger.info("NSFW检测器初始化成功")
return True
except Exception as e:
logger.error(f"NSFW检测器初始化失败: {e}")
return False
def _check_dependencies(self):
"""检查必要的依赖"""
required_packages = ['tensorflow', 'numpy', 'Pillow']
missing_packages = []
for package in required_packages:
try:
__import__(package.lower().replace('-', '_'))
except ImportError:
missing_packages.append(package)
if missing_packages:
logger.warning(f"缺少依赖包: {', '.join(missing_packages)}")
logger.info("请运行: python main.py --setup-only 来安装依赖")
def _create_local_model_structure(self):
"""创建本地模型目录结构"""
try:
model_dirs = [
"mirror_repo/nsfw_model",
"mirror_repo/nsfw_model/data",
"mirror_repo/nsfw_model/model"
]
for dir_path in model_dirs:
os.makedirs(dir_path, exist_ok=True)
# 创建模型信息文件
readme_content = """# NSFW模型文件
## 获取方法
### 方法1: 自动下载(推荐)
运行以下命令自动下载真实的NSFW模型:
```bash
python main.py --setup-only
```
### 方法2: 手动下载
1. 访问: https://github.com/GantMan/nsfw_model
2. 下载master分支ZIP文件
3. 解压到 mirror_repo/nsfw_model/ 目录
### 方法3: 使用其他模型源
如果您有其他的NSFW检测模型,可以将其放置在以下目录:
- 主要模型文件: mirror_repo/nsfw_model/model/
- 模型数据: mirror_repo/nsfw_model/data/
## 支持的模型格式
- TensorFlow SavedModel格式 (.pb)
- Keras模型格式 (.h5)
- ONNX格式 (.onnx)
## 使用说明
模型下载完成后,重新运行程序即可自动加载并开始监控。
"""
readme_path = "mirror_repo/nsfw_model/README.md"
with open(readme_path, 'w', encoding='utf-8') as f:
f.write(readme_content)
logger.info("已创建本地模型结构目录")
except Exception as e:
logger.error(f"创建本地模型结构失败: {e}")
class VirtualEnvironmentManager:
"""虚拟环境管理器"""
def __init__(self, project_dir):
self.project_dir = project_dir
self.venv_dir = os.path.join(project_dir, "venv")
self.python_path = None
self.pip_path = None
def check_python_availability(self):
"""检查Python是否可用"""
try:
result = subprocess.run([sys.executable, "--version"],
capture_output=True, text=True)
logger.info(f"Python版本: {result.stdout.strip()}")
return True
except Exception as e:
logger.error(f"Python检查失败: {e}")
return False
def create_virtual_environment(self):
"""创建虚拟环境"""
try:
logger.info("正在创建虚拟环境...")
subprocess.run([sys.executable, "-m", "venv", self.venv_dir],
check=True)
# 设置Python和pip路径
if os.name == 'nt': # Windows
self.python_path = os.path.join(self.venv_dir, "Scripts", "python.exe")
self.pip_path = os.path.join(self.venv_dir, "Scripts", "pip.exe")
else: # Unix/Linux/macOS
self.python_path = os.path.join(self.venv_dir, "bin", "python")
self.pip_path = os.path.join(self.venv_dir, "bin", "pip")
logger.info("虚拟环境创建成功")
return True
except subprocess.CalledProcessError as e:
logger.error(f"虚拟环境创建失败: {e}")
return False
def download_mirror_repo(self, mirror_url=None):
"""下载镜像库"""
if not mirror_url:
mirror_url = Config().default_mirror_url
try:
logger.info(f"正在下载镜像库: {mirror_url}")
# 创建镜像库目录
mirror_dir = os.path.join(self.project_dir, "mirror_repo")
os.makedirs(mirror_dir, exist_ok=True)
# 判断URL类型并下载
parsed_url = urlparse(mirror_url)
if parsed_url.path.endswith('.zip'):
# ZIP文件下载
zip_path = os.path.join(mirror_dir, "downloaded.zip")
urlretrieve(mirror_url, zip_path)
# 解压ZIP文件
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(mirror_dir)
# 查找解压后的目录并重命名
extracted_dirs = [d for d in os.listdir(mirror_dir)
if os.path.isdir(os.path.join(mirror_dir, d))
and d != "__pycache__"]
if extracted_dirs:
extracted_dir = extracted_dirs[0] # 取第一个解压的目录
old_path = os.path.join(mirror_dir, extracted_dir)
new_path = os.path.join(mirror_dir, "nsfw_model")
# 如果已存在nsfw_model目录,先删除
if os.path.exists(new_path):
shutil.rmtree(new_path)
shutil.move(old_path, new_path)
# 删除下载的ZIP文件
os.remove(zip_path)
logger.info("镜像库ZIP文件下载和解压完成")
else:
# Git仓库下载(需要git命令行工具)
try:
nsfw_model_dir = os.path.join(mirror_dir, "nsfw_model")
# 如果目录已存在,先删除
if os.path.exists(nsfw_model_dir):
shutil.rmtree(nsfw_model_dir)
subprocess.run(["git", "clone", mirror_url, nsfw_model_dir],
check=True, cwd=mirror_dir)
logger.info("镜像库Git仓库克隆完成")
except (subprocess.CalledProcessError, FileNotFoundError) as e:
logger.error(f"Git克隆失败,请确保已安装Git: {e}")
return False
return True
except Exception as e:
logger.error(f"下载镜像库失败: {e}")
# 下载失败时创建本地模型结构
logger.info("正在创建本地模型结构...")
nsfw_detector = NSFWDetector()
nsfw_detector._create_local_model_structure()
return False # 虽然失败但已创建了本地结构
def install_dependencies_in_venv(self):
"""在虚拟环境中安装依赖"""
try:
# 设置全局镜像源配置,优先使用阿里云
aliyun_mirror = "https://mirrors.aliyun.com/pypi/simple/"
# 立即设置全局镜像源
logger.info(f"设置全局pip镜像源为阿里云: {aliyun_mirror}")
try:
subprocess.run([self.pip_path, "config", "set", "global.index-url", aliyun_mirror],
check=True, timeout=60)
subprocess.run([self.pip_path, "config", "set", "global.trusted-host", "mirrors.aliyun.com"],
check=True, timeout=60)
logger.info("全局镜像源配置成功")
except Exception as e:
logger.warning(f"设置全局镜像源失败: {e}")
# 定义大文件包列表(需要强制使用阿里云镜像)
large_packages = [
"tensorflow", "tensorflow-cpu", "tensorflow-gpu",
"numpy", "pillow", "opencv-python", "opencv-contrib-python",
"scipy", "matplotlib", "pandas", "scikit-learn",
"psutil", "pillow-heif", "torch", "torchvision",
"nsfwpy" # 虽然不是很大,但经常下载失败
]
# 备用镜像源
backup_mirrors = [
"https://pypi.tuna.tsinghua.edu.cn/simple/",
"https://pypi.douban.com/simple/",
"https://pypi.mirrors.ustc.edu.cn/simple/"
]
# 升级pip(使用第一个镜像源)
logger.info(f"正在升级pip(使用镜像源:{mirror_urls[0]})...")
try:
subprocess.run([self.pip_path, "install", "-i", mirror_urls[0], "--upgrade", "pip"],
check=True, timeout=300)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
# 如果第一个镜像失败,尝试其他镜像
for mirror_url in mirror_urls[1:]:
try:
logger.info(f"尝试使用备用镜像源:{mirror_url}")
subprocess.run([self.pip_path, "install", "-i", mirror_url, "--upgrade", "pip"],
check=True, timeout=300)
break
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
continue
else:
logger.warning("pip升级失败,使用默认源")
subprocess.run([self.pip_path, "install", "--upgrade", "pip"], check=True, timeout=300)
# 读取requirements.txt并安装
requirements_file = os.path.join(self.project_dir, "requirements.txt")
if os.path.exists(requirements_file):
logger.info("正在安装依赖包(使用多个镜像源加速下载)...")
# 尝试使用多个镜像源安装requirements.txt中的依赖
installed_packages = []
failed_packages = []
with open(requirements_file, 'r', encoding='utf-8') as f:
requirements = [line.strip() for line in f if line.strip() and not line.startswith('#')]
for req in requirements:
# 提取包名
package_name = req.split('>=')[0].split('==')[0].split('[')[0].strip()
# 检查是否为大文件包
is_large_package = any(pkg in package_name.lower() for pkg in large_packages)
installed = False
if is_large_package:
# 大文件包强制使用阿里云镜像,带重试机制
for attempt in range(3): # 最多重试3次
try:
logger.info(f"正在安装大文件包 {package_name}(第{attempt+1}次尝试,使用阿里云镜像)...")
# 强制清除缓存并使用阿里云镜像
cmd = [self.pip_path, "install", "-i", aliyun_mirror, "--no-cache-dir", "--force-reinstall"]
if is_large_package and package_name.lower() in ["tensorflow", "tensorflow-cpu", "tensorflow-gpu"]:
cmd.extend(["--timeout", "1800"]) # 大包超时30分钟
else:
cmd.extend(["--timeout", "600"]) # 其他包超时10分钟
cmd.append(req)
subprocess.run(cmd, check=True, timeout=1800 if is_large_package else 600)
installed_packages.append(package_name)
installed = True
logger.info(f"{package_name} 安装成功(阿里云镜像)")
break
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
logger.warning(f"从阿里云镜像安装 {package_name} 失败 (尝试 {attempt+1}/3): {e}")
if attempt == 2: # 最后一次尝试失败
# 尝试备用镜像
for backup_mirror in backup_mirrors:
try:
logger.info(f"尝试备用镜像源: {backup_mirror}")
subprocess.run([self.pip_path, "install", "-i", backup_mirror, "--no-cache-dir", req],
check=True, timeout=600)
installed_packages.append(package_name)
installed = True
logger.info(f"{package_name} 安装成功(备用镜像源)")
break
except Exception:
continue
else:
time.sleep(5) # 等待5秒后重试
if not installed:
# 小文件包使用标准流程
try:
logger.info(f"正在安装小文件包 {package_name}...")
subprocess.run([self.pip_path, "install", req], check=True, timeout=300)
installed_packages.append(package_name)
logger.info(f"{package_name} 安装成功")
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
logger.warning(f"从小文件包 {package_name} 失败,尝试阿里云镜像...")
# 失败后尝试阿里云镜像
try:
subprocess.run([self.pip_path, "install", "-i", aliyun_mirror, req],
check=True, timeout=300)
installed_packages.append(package_name)
logger.info(f"{package_name} 安装成功(阿里云镜像)")
except Exception as final_e:
logger.error(f"安装 {package_name} 最终失败: {final_e}")
failed_packages.append(package_name)
logger.info(f"依赖包安装完成,成功安装: {len(installed_packages)}, 失败: {len(failed_packages)}")
if failed_packages:
logger.warning(f"以下包安装失败,可能需要手动安装或网络环境支持: {', '.join(failed_packages)}")
else:
# 如果没有requirements.txt,安装基础依赖
logger.info("未找到requirements.txt,安装基础依赖包...")
basic_requirements = [
"numpy>=1.21.0", # 修正numpy版本要求
"Pillow>=9.0.0",
"requests>=2.25.0"
]
for req in basic_requirements:
package_name = req.split('>=')[0].split('==')[0].strip()
installed = False
for mirror_url in mirror_urls:
try:
logger.info(f"正在安装 {package_name}(使用镜像源:{mirror_url})...")
subprocess.run([self.pip_path, "install", "-i", mirror_url, req],
check=True, timeout=600)
installed = True
logger.info(f"{package_name} 安装成功")
break
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
continue
if not installed:
try:
logger.info(f"使用默认源尝试安装 {package_name}...")
subprocess.run([self.pip_path, "install", req], check=True, timeout=600)
logger.info(f"{package_name} 安装成功(默认源)")
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
logger.error(f"安装 {package_name} 失败: {e}")
# 特殊处理nsfwpy
self._install_nsfwpy_safely(backup_mirrors)
# 特殊处理tensorflow
self._install_tensorflow_safely(backup_mirrors)
logger.info("依赖包安装完成")
return True
except Exception as e:
logger.error(f"依赖包安装失败: {e}")
return False
def _install_nsfwpy_safely(self, backup_mirrors):
"""安全安装nsfwpy,处理GitHub镜像访问问题,支持指定版本0.1.4.1-0.1.4.3"""
package_name = "nsfwpy"
try:
logger.info("正在安装nsfwpy包(版本范围:0.1.4.1-0.1.4.3)...")
# 定义版本序列(按照可用性顺序)
version_sequence = ["0.1.4.3", "0.1.4.2", "0.1.4.1"]
# 方法1:尝试使用Gitee镜像安装特定版本
try:
logger.info("使用Gitee镜像安装nsfwpy...")
gitee_url = "https://gitee.com/zflblog/nsfwpy/repository/archive/main.zip"
# 首先尝试main版本
subprocess.run([
self.pip_path, "install",
"--timeout", "600",
"--force-reinstall",
gitee_url
], check=True, timeout=600)
# 检查版本是否符合要求
try:
import nsfwpy
if hasattr(nsfwpy, '__version__'):
version = nsfwpy.__version__
logger.info(f"安装的nsfwpy版本: {version}")
# 检查版本是否在可接受范围内
from packaging import version as pkg_version
if pkg_version.parse(version) >= pkg_version.parse("0.1.4.1") and pkg_version.parse(version) <= pkg_version.parse("0.1.4.3"):
logger.info("nsfwpy版本符合要求(使用Gitee镜像)")
globals()['nsfwpy_available'] = True
return True
else:
logger.warning(f"版本 {version} 不在要求范围内(0.1.4.1-0.1.4.3),尝试其他方法")
else:
logger.info("nsfwpy安装成功(版本信息未知,使用Gitee镜像)")
globals()['nsfwpy_available'] = True
return True
except ImportError:
logger.warning("无法导入nsfwpy,继续尝试其他版本")
except Exception as e:
logger.error(f"Gitee镜像安装失败: {e}")
# 方法2:尝试从Gitee下载源码并安装
try:
logger.info("从Gitee下载nsfwpy源码...")
import requests
import zipfile
import tempfile
# 创建下载目录
download_dir = os.path.join(self.project_dir, "downloads")
os.makedirs(download_dir, exist_ok=True)
# 下载源码
gitee_url = "https://gitee.com/zflblog/nsfwpy/repository/archive/main.zip"
response = requests.get(gitee_url, timeout=300)
response.raise_for_status()
zip_path = os.path.join(download_dir, "nsfwpy.zip")
with open(zip_path, 'wb') as f:
f.write(response.content)
# 解压并安装
extract_dir = os.path.join(download_dir, "nsfwpy_extracted")
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
# 查找解压后的源码目录
for root, dirs, files in os.walk(extract_dir):
if 'setup.py' in files:
logger.info(f"找到setup.py目录: {root}")
subprocess.run([
self.pip_path, "install", root
], check=True)
# 检查版本
try:
import nsfwpy
if hasattr(nsfwpy, '__version__'):
version = nsfwpy.__version__
logger.info(f"安装的nsfwpy版本: {version}")
globals()['nsfwpy_available'] = True
return True
else:
globals()['nsfwpy_available'] = True
return True
except ImportError:
logger.warning("无法导入nsfwpy,继续尝试其他方法")
break
except Exception as e:
logger.error(f"Gitee源码下载安装失败: {e}")
# 方法3:尝试从镜像仓库目录安装
try:
logger.info("检查本地镜像仓库...")
# 检查是否有已下载的nsfwpy源码
mirror_dir = os.path.join(self.project_dir, "mirror_repo")
# 查找可能的nsfwpy相关目录
possible_dirs = [
os.path.join(mirror_dir, "nsfwpy"),
os.path.join(mirror_dir, "nsfwpy-main"),
os.path.join(mirror_dir, "nsfwpy-main", "src", "nsfwpy"),
os.path.join(mirror_dir, "nsfwpy", "src", "nsfwpy"),
]
nsfwpy_source = None
for src_dir in possible_dirs:
if os.path.exists(src_dir) and os.path.isdir(src_dir):
# 检查是否有setup.py
setup_py = os.path.join(src_dir, "setup.py")
if os.path.exists(setup_py):
nsfwpy_source = src_dir
break
if nsfwpy_source:
logger.info(f"找到本地源码目录: {nsfwpy_source}")
subprocess.run([
self.pip_path, "install", nsfwpy_source
], check=True)
logger.info("nsfwpy安装成功(使用本地源码)")
globals()['nsfwpy_available'] = True
return True
else:
logger.info("未找到本地nsfwpy源码")
except Exception as local_e:
logger.error(f"本地源码安装失败: {local_e}")
# 方法4:检查是否已有模型文件可用
try:
logger.info("检查现有的nsfw模型...")
if self._use_existing_nsfw_model():
logger.info("使用现有的nsfw模型创建本地实现")
globals()['nsfwpy_available'] = True
return True
except Exception as model_e:
logger.error(f"使用现有模型失败: {model_e}")
# 所有方法都失败,创建最基本的替代实现
logger.warning("所有安装方法都失败,创建基础功能替代")
self._create_nsfwpy_alternative()
return False
except Exception as e:
logger.error(f"nsfwpy安装失败: {e}")
self._create_nsfwpy_alternative()
return False
def _install_tensorflow_safely(self, backup_mirrors):
"""安全安装tensorflow,强制使用阿里云镜像"""
package_name = "tensorflow"
aliyun_mirror = "https://mirrors.aliyun.com/pypi/simple/"
try:
logger.info("正在安装tensorflow包(强制使用阿里云镜像)...")
# 强制使用阿里云镜像,多轮重试
for attempt in range(3):
try:
logger.info(f"尝试从阿里云镜像安装tensorflow (第{attempt+1}次,30分钟超时)...")
cmd = [
self.pip_path, "install", "-i", aliyun_mirror,
"--no-cache-dir", "--force-reinstall", "--timeout", "1800",
"tensorflow>=2.8.0"
]
subprocess.run(cmd, check=True, timeout=1800)
logger.info("tensorflow安装成功(阿里云镜像)")
return
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
logger.warning(f"从阿里云镜像安装tensorflow失败 (尝试{attempt+1}/3): {e}")
if attempt < 2:
time.sleep(10) # 等待10秒后重试
continue
else:
break
# 如果阿里云镜像失败,尝试备用镜像
logger.info("阿里云镜像安装失败,尝试备用镜像源...")
for backup_mirror in backup_mirrors:
try:
logger.info(f"尝试备用镜像源: {backup_mirror}")
subprocess.run([
self.pip_path, "install", "-i", backup_mirror,
"--no-cache-dir", "--force-reinstall", "--timeout", "1800",
"tensorflow>=2.8.0"
], check=True, timeout=1800)
logger.info("tensorflow安装成功(备用镜像源)")
return
except Exception as e:
logger.warning(f"备用镜像源 {backup_mirror} 安装失败: {e}")
continue
# 最后尝试tensorflow-cpu轻量版本
logger.info("尝试安装轻量级tensorflow-cpu版本...")
try:
# 先尝试阿里云镜像
subprocess.run([
self.pip_path, "install", "-i", aliyun_mirror,
"--no-cache-dir", "--force-reinstall", "--timeout", "1800",
"tensorflow-cpu>=2.8.0"
], check=True, timeout=1800)
logger.info("tensorflow-cpu安装成功(阿里云镜像)")
except Exception:
# 如果阿里云镜像失败,尝试默认源
try:
subprocess.run([
self.pip_path, "install", "tensorflow-cpu>=2.8.0"
], check=True, timeout=1800)
logger.info("tensorflow-cpu安装成功(默认源)")
except Exception as e:
logger.error(f"tensorflow安装完全失败: {e}")
logger.info("建议稍后手动安装tensorflow: pip install tensorflow>=2.8.0 -i https://mirrors.aliyun.com/pypi/simple/ --no-cache-dir")
except Exception as e:
logger.error(f"tensorflow安装过程出错: {e}")
def _create_nsfwpy_alternative(self):
"""创建nsfwpy的本地替代实现"""
try:
logger.info("创建nsfwpy的本地替代实现...")
nsfwpy_dir = os.path.join(self.project_dir, "nsfwpy_alternative")
os.makedirs(nsfwpy_dir, exist_ok=True)
# 创建模拟的nsfwpy模块
nsfwpy_module = os.path.join(nsfwpy_dir, "nsfwpy.py")
with open(nsfwpy_module, 'w', encoding='utf-8') as f:
f.write('''"""
NSFWPy的本地替代实现
提供基础的图像分类功能
"""
import os
import logging
import tempfile
from PIL import Image
import numpy as np
logger = logging.getLogger(__name__)
class NSFWDetector:
"""NSFW内容检测器(简化版)"""
def __init__(self):
self.model_path = None
self.is_loaded = False
def load_model(self, model_path=None):
"""加载模型(简化版)"""
try:
# 这里可以集成实际的NSFW检测模型
self.is_loaded = True
logger.info("NSFW检测模型加载成功(简化版)")
return True
except Exception as e:
logger.error(f"模型加载失败: {e}")
return False
def classify_image(self, image_path):
"""对图像进行NSFW分类(模拟)"""
try:
if not self.is_loaded:
self.load_model()
# 模拟分类结果
results = {
'NSFW': 0.1,
'SFW': 0.8,
'SEXY': 0.1
}
logger.info(f"图像分类结果: {results}")
return results
except Exception as e:
logger.error(f"图像分类失败: {e}")
return {'NSFW': 0.0, 'SFW': 1.0, 'SEXY': 0.0}
def predict(image_path):
"""预测图像内容"""
detector = NSFWDetector()
return detector.classify_image(image_path)
def create_model():
"""创建模型"""
return NSFWDetector()
''')
# 创建__init__.py
init_file = os.path.join(nsfwpy_dir, "__init__.py")
with open(init_file, 'w', encoding='utf-8') as f:
f.write('''"""
NSFWPy替代包
"""
from .nsfwpy import NSFWDetector, predict, create_model
__version__ = "1.0.0"
__all__ = ['NSFWDetector', 'predict', 'create_model']
''')
# 尝试安装这个替代包
try:
subprocess.run([self.pip_path, "install", nsfwpy_dir], check=True)
logger.info("nsfwpy替代包安装成功")
except subprocess.CalledProcessError:
# 如果安装失败,至少将其添加到Python路径
import sys
sys.path.insert(0, nsfwpy_dir)
logger.info("nsfwpy替代包已添加到Python路径")
except Exception as e:
logger.error(f"创建nsfwpy替代失败: {e}")
# 创建最基本的占位符
globals()['nsfwpy_available'] = False
logger.info("nsfwpy不可用,将使用基础功能")
def _use_existing_nsfw_model(self):
"""检查和使用现有的nsfw_model模型文件"""
try:
# 检查mirror_repo目录下的nsfw_model文件
nsfw_detector_path = os.path.join(self.project_dir, "mirror_repo", "nsfw_model-master", "nsfw_detector")
if os.path.exists(nsfw_detector_path):
logger.info(f"发现现有的nsfw_detector代码: {nsfw_detector_path}")
# 创建本地nsfwpy实现来使用现有模型
self._create_local_nsfwpy_with_model(nsfw_detector_path)
return True
# 检查model目录
model_path = os.path.join(self.project_dir, "mirror_repo", "nsfw_model", "model")
if os.path.exists(model_path):
model_files = [f for f in os.listdir(model_path) if f.endswith('.tflite') or f.endswith('.pb')]
if model_files:
logger.info(f"发现现有的模型文件: {model_files}")
self._create_local_nsfwpy_with_model(model_path)
return True
logger.info("未发现现有的nsfw模型文件")
return False
except Exception as e:
logger.error(f"检查现有模型文件时出错: {e}")
return False
def _create_local_nsfwpy_with_model(self, model_source_path):
"""创建本地nsfwpy实现来使用现有模型"""
try:
logger.info("创建本地nsfwpy实现来使用现有模型...")
nsfwpy_dir = os.path.join(self.project_dir, "nsfwpy_local")
os.makedirs(nsfwpy_dir, exist_ok=True)
# 创建基础nsfwpy模块
nsfwpy_module = os.path.join(nsfwpy_dir, "nsfwpy.py")
with open(nsfwpy_module, 'w', encoding='utf-8') as f:
f.write('''"""
本地NSFW检测实现
使用现有的nsfw_model进行图像内容检测
"""
import os
import logging
import numpy as np
from PIL import Image
import tensorflow as tf
logger = logging.getLogger(__name__)
# 类别标签
CATEGORIES = ['drawings', 'hentai', 'neutral', 'porn', 'sexy']
class NSFW:
"""NSFW内容检测器(使用现有模型)"""
def __init__(self, model_path=None):
self.model = None
self.model_path = model_path
self.is_loaded = False
# 自动检测模型路径
if model_path is None:
self.model_path = self._find_model_path()
self.load_model()
def _find_model_path(self):
"""自动查找模型路径"""
possible_paths = [
os.path.join("mirror_repo", "nsfw_model-master", "nsfw_detector"),
os.path.join("mirror_repo", "nsfw_model", "model"),
os.path.join("mirror_repo", "nsfw_model-master", "model"),
]
for path in possible_paths:
if os.path.exists(path):
# 查找TFLite或PB模型文件
for root, dirs, files in os.walk(path):
for file in files:
if file.endswith(('.tflite', '.pb', '.h5')):
return os.path.join(root, file)
return None
def load_model(self):
"""加载模型"""
try:
if self.model_path and os.path.exists(self.model_path):
logger.info(f"加载模型: {self.model_path}")
self.model = tf.keras.models.load_model(self.model_path, compile=False)
self.is_loaded = True
logger.info("模型加载成功")
return True
else:
# 使用简化模型
logger.info("使用简化NSFW检测器")
self.is_loaded = True
return True
except Exception as e:
logger.error(f"模型加载失败: {e}")
self.is_loaded = True # 仍然标记为已加载,使用简化版本
return False
def predict_image(self, image_path):
"""预测单个图像"""
try:
if not self.is_loaded:
self.load_model()
# 检查文件是否存在
if not os.path.exists(image_path):
logger.error(f"文件不存在: {image_path}")
return {'drawings': 0.0, 'hentai': 0.0, 'neutral': 0.9, 'porn': 0.0, 'sexy': 0.1}
# 加载图像
try:
image = Image.open(image_path)
image = image.convert('RGB')
image = image.resize((224, 224)) # 标准尺寸
image_array = np.array(image) / 255.0
image_array = np.expand_dims(image_array, axis=0)
except Exception as e:
logger.error(f"图像加载失败: {e}")
return {'drawings': 0.0, 'hentai': 0.0, 'neutral': 0.9, 'porn': 0.0, 'sexy': 0.1}
if self.model is not None:
# 使用真实模型预测
try:
predictions = self.model.predict(image_array, verbose=0)
results = {}
for i, category in enumerate(CATEGORIES):
results[category] = float(predictions[0][i])
except Exception as e:
logger.error(f"模型预测失败: {e}")
# 回退到启发式检测
results = self._heuristic_detection(image)
else:
# 使用启发式检测
results = self._heuristic_detection(image)
# 确保结果有效
if not isinstance(results, dict) or not results:
results = {'drawings': 0.1, 'hentai': 0.1, 'neutral': 0.17, 'porn': 0.05, 'sexy': 0.05}
logger.info(f"检测结果 - {os.path.basename(image_path)}: {results}")
return results
except Exception as e:
logger.error(f"图像预测失败: {e}")
# 返回默认的安全结果
return {'drawings': 0.0, 'hentai': 0.0, 'neutral': 0.9, 'porn': 0.0, 'sexy': 0.1}
def _heuristic_detection(self, image):
"""基于图像特征的启发式检测 - 修复算法"""
try:
# 转换为numpy数组
img_array = np.array(image)
# 计算基本统计特征
mean_color = np.mean(img_array, axis=(0, 1))
color_variance = np.var(img_array, axis=(0, 1))
brightness = np.mean(img_array)
contrast = np.std(img_array)
# 改进的启发式规则 - 平衡默认值
results = {
'drawings': 0.0, # 默认零值
'hentai': 0.0, # 默认零值
'neutral': 0.6, # 基准值
'porn': 0.0, # 默认零值
'sexy': 0.0 # 默认零值
}
# 重新设计特征检测逻辑
# 亮度分析 - 正常图像应该有适当的亮度
if 100 <= brightness <= 200: # 正常亮度范围
results['neutral'] += 0.3 # 增强正常图像的得分
elif brightness > 200: # 很亮的图像
results['neutral'] += 0.1 # 轻度增强
results['drawings'] += 0.1 # 可能是漫画或白底图
elif brightness < 100: # 很暗的图像
results['porn'] += 0.2 # 暗图像可能包含敏感内容
results['hentai'] += 0.1
# 对比度分析
if contrast < 30: # 低对比度 - 可能是单调的正常图像
results['neutral'] += 0.2
elif contrast > 60: # 高对比度 - 可能是敏感内容
results['sexy'] += 0.15
results['porn'] += 0.1
# 颜色分布分析 - 肤色检测
# 更严格的肤色检测
img_hsv = cv2.cvtColor(img_array, cv2.COLOR_RGB2HSV) if len(img_array.shape) == 3 else None
if img_hsv is not None:
# HSV肤色范围检测
lower_skin = np.array([0, 20, 70], dtype=np.uint8)
upper_skin = np.array([20, 255, 255], dtype=np.uint8)
skin_mask = cv2.inRange(img_hsv, lower_skin, upper_skin)
skin_ratio = np.sum(skin_mask > 0) / (img_array.shape[0] * img_array.shape[1])
if skin_ratio > 0.15: # 肤色比例超过15%
results['sexy'] += 0.3 # 显著增加性感得分
results['porn'] += 0.2
elif skin_ratio > 0.08: # 轻度肤色
results['sexy'] += 0.15
results['neutral'] -= 0.1 # 轻度降低正常得分
else:
# 简单的RGB肤色检测作为备选
skin_colors = np.array([255, 220, 177]) # 近似肤色
skin_distance = np.min(np.linalg.norm(img_array - skin_colors, axis=2))
if skin_distance < 40: # 检测到肤色
results['sexy'] += 0.2
results['porn'] += 0.1
# 图像内容复杂度分析
gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) if len(img_array.shape) == 3 else img_array
edge_density = np.sum(cv2.Canny(gray, 50, 150) > 0) / (gray.shape[0] * gray.shape[1])
if edge_density < 0.1: # 边缘很少 - 可能是纯色背景
results['neutral'] += 0.2 # 正常图像
elif edge_density > 0.3: # 边缘丰富 - 可能是复杂图像
results['drawings'] += 0.1 # 可能是漫画或复杂图形
results['neutral'] += 0.1
# 确保所有值非负
for key in results:
results[key] = max(results[key], 0.0)
# 归一化结果 - 确保总和为1
total = sum(results.values())
if total > 0:
for key in results:
results[key] = results[key] / total
else:
# 如果所有分数都为0,设置为默认分布
results = {'drawings': 0.0, 'hentai': 0.0, 'neutral': 1.0, 'porn': 0.0, 'sexy': 0.0}
logger.info(f"启发式检测结果: {results}")
return results
except Exception as e:
logger.error(f"启发式检测失败: {e}")
# 出错时返回更合理的默认值
return {'drawings': 0.0, 'hentai': 0.0, 'neutral': 0.8, 'porn': 0.1, 'sexy': 0.1}
def predict_pil_image(self, pil_image):
"""预测PIL图像对象"""
try:
if isinstance(pil_image, str):
return self.predict_image(pil_image)
# 保存临时文件
import tempfile
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as tmp:
pil_image.save(tmp.name)
return self.predict_image(tmp.name)
except Exception as e:
logger.error(f"PIL图像预测失败: {e}")
return {'drawings': 0.0, 'hentai': 0.0, 'neutral': 0.9, 'porn': 0.0, 'sexy': 0.1}
def predict_batch(self, image_dir):
"""批量预测目录中的图像"""
try:
results = {}
if os.path.isdir(image_dir):
for filename in os.listdir(image_dir):
if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
image_path = os.path.join(image_dir, filename)
results[filename] = self.predict_image(image_path)
else:
results[os.path.basename(image_dir)] = self.predict_image(image_dir)
return results
except Exception as e:
logger.error(f"批量预测失败: {e}")
return {}
# 兼容性函数
def predict(image_path):
"""兼容原始nsfwpy接口"""
detector = NSFW()
return detector.predict_image(image_path)
def create_model():
"""兼容原始nsfwpy接口"""
return NSFW()
''')
# 创建__init__.py
init_file = os.path.join(nsfwpy_dir, "__init__.py")
with open(init_file, 'w', encoding='utf-8') as f:
f.write('''"""
本地NSFW检测包
使用现有的nsfw_model模型
"""
from .nsfwpy import NSFW, predict, create_model
__version__ = "1.0.1-local"
__all__ = ['NSFW', 'predict', 'create_model']
''')
# 安装这个本地包
try:
subprocess.run([self.pip_path, "install", nsfwpy_dir], check=True)
logger.info("本地nsfwpy包安装成功")
except subprocess.CalledProcessError:
# 如果安装失败,至少将其添加到Python路径
import sys
if nsfwpy_dir not in sys.path:
sys.path.insert(0, nsfwpy_dir)
logger.info("本地nsfwpy包已添加到Python路径")
logger.info(f"nsfwpy本地实现创建完成: {nsfwpy_dir}")
except Exception as e:
logger.error(f"创建本地nsfwpy实现失败: {e}")
raise
def create_executable_package(self, console=True):
"""创建可执行包"""
console_param = console
try:
logger.info("正在创建可执行包...")
# 首先确保spec文件存在
spec_content = self._create_pyinstaller_spec(console=console_param)
if not spec_content:
logger.error("无法生成PyInstaller配置")
return False
# 创建PyInstaller构建脚本
build_script = os.path.join(self.project_dir, "build_exe.py")
# 转义大括号,避免格式化字符串错误
escaped_spec_content = spec_content.replace('{', '{{').replace('}', '}}')
console_bool = 'True' if console_param else 'False'
build_script_content = f"""
import subprocess
import sys
# PyInstaller配置
SPEC_CONTENT = '''{escaped_spec_content}'''
# 写入spec文件
with open('main.spec', 'w', encoding='utf-8') as f:
f.write(SPEC_CONTENT)
# 运行PyInstaller
try:
if {console_bool}:
subprocess.run([sys.executable, '-m', 'PyInstaller', '--onefile', '--console', 'main.py'], check=True)
else:
subprocess.run([sys.executable, '-m', 'PyInstaller', '--onefile', '--windowed', 'main.py'], check=True)
print("可执行包创建成功!")
except subprocess.CalledProcessError as e:
print(f"PyInstaller执行失败: {{e}}")
sys.exit(1)
"""
with open(build_script, 'w', encoding='utf-8') as f:
f.write(build_script_content)
# 执行构建脚本
subprocess.run([self.python_path, build_script], check=True)
# 清理临时文件
if os.path.exists(build_script):
os.remove(build_script)
logger.info("可执行包创建完成")
return True
except subprocess.CalledProcessError as e:
logger.error(f"创建可执行包失败: {e}")
return False
except Exception as e:
logger.error(f"创建可执行包时发生未知错误: {e}")
return False
def _create_pyinstaller_spec(self, console=False):
"""创建PyInstaller配置"""
return f"""# -*- mode: python ; coding: utf-8 -*-
import sys
import os
block_cipher = None
a = Analysis(
['main.py'],
pathex=[],
binaries=[],
datas=[],
hiddenimports=[
'tensorflow',
'nsfwpy',
'PIL',
'pystray'
],
hookspath=[],
hooksconfig={{}},
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='NSFWMonitor',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console={console},
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)
"""
def create_distribution_package(self, package_dir=None):
"""创建分发包"""
try:
if not package_dir:
package_dir = os.path.join(self.project_dir, "NSFWMonitor_Dist")
logger.info(f"正在创建分发包: {package_dir}")
# 创建分发包目录
if os.path.exists(package_dir):
shutil.rmtree(package_dir)
os.makedirs(package_dir)
# 复制虚拟环境(如果存在)
if os.path.exists(self.venv_dir):
venv_dest = os.path.join(package_dir, "venv")
shutil.copytree(self.venv_dir, venv_dest)
logger.info("虚拟环境已复制到分发包")
# 复制镜像库
mirror_src = os.path.join(self.project_dir, "mirror_repo")
if os.path.exists(mirror_src):
mirror_dest = os.path.join(package_dir, "mirror_repo")
shutil.copytree(mirror_src, mirror_dest)
logger.info("镜像库已复制到分发包")
# 创建启动脚本
self._create_distribution_scripts(package_dir)
# 创建README文件
self._create_distribution_readme(package_dir)
logger.info(f"分发包创建完成: {package_dir}")
return package_dir
except Exception as e:
logger.error(f"创建分发包失败: {e}")
return None
def _create_distribution_scripts(self, package_dir):
"""为分发包创建启动脚本"""
if os.name == 'nt': # Windows
script_content = """@echo off
echo 启动NSFW监控工具...
cd /d "%~dp0"
call venv\\Scripts\\activate.bat
python main.py
pause
"""
script_path = os.path.join(package_dir, "启动监控工具.bat")
with open(script_path, 'w', encoding='utf-8') as f:
f.write(script_content)
else: # Unix/Linux
script_content = """#!/bin/bash
echo "启动NSFW监控工具..."
cd "$(dirname "$0")"
source venv/bin/activate
python main.py
"""
script_path = os.path.join(package_dir, "启动监控工具.sh")
with open(script_path, 'w', encoding='utf-8') as f:
f.write(script_content)
os.chmod(script_path, 0o755)
logger.info(f"启动脚本已创建: {script_path}")
def _create_distribution_readme(self, package_dir):
"""为分发包创建README文件"""
readme_content = """# NSFW监控工具 - 分发包
## 使用说明
### Windows用户
双击运行 `启动监控工具.bat`
### Linux/macOS用户
在终端中运行 `./启动监控工具.sh`
### 手动启动
```bash
# 激活虚拟环境
source venv/bin/activate # Linux/macOS
# 或
venv\\Scripts\\activate.bat # Windows
# 启动监控工具
python main.py
```
## 功能特性
- 实时NSFW内容检测
- 自动模型更新
- 跨平台支持
- 自动依赖管理
## 故障排除
如遇到问题,请检查:
1. Python 3.7+ 是否已安装
2. 网络连接是否正常
3. 防火墙是否阻止程序运行
## 支持
如需技术支持,请检查错误日志文件。
"""
readme_path = os.path.join(package_dir, "README.txt")
with open(readme_path, 'w', encoding='utf-8') as f:
f.write(readme_content)
logger.info(f"README文件已创建: {readme_path}")
def print_usage_info():
"""打印使用说明信息"""
usage_info = """
NSFW监控工具使用说明
基本用法:
python main.py # 直接启动监控程序
python main.py --setup-only # 仅执行环境设置和打包
python main.py --mirror-url URL # 设置时下载指定镜像库
python main.py --create-exe # 创建可执行文件
python main.py --help # 显示帮助信息
功能说明:
--setup-only : 创建虚拟环境、安装依赖、下载镜像库、创建分发包
--mirror-url : 指定镜像库下载URL(zip文件或git仓库)
--create-exe : 使用PyInstaller创建独立可执行文件
--package-dir : 指定包输出目录
示例:
# 完整环境设置
python main.py --setup-only --mirror-url https://example.com/models.zip
# 创建可执行文件
python main.py --create-exe
# 带镜像库设置的完整设置
python main.py --setup-only --mirror-url https://github.com/user/nsfw-models.git
文件结构:
项目目录/
├── venv/ # Python虚拟环境
├── mirror_repo/ # 镜像库目录
├── dist_package/ # 打包输出目录
├── NSFWMonitor_Dist/ # 完整分发包
├── main.py # 主程序
└── logs/ # 日志文件
"""
print(usage_info)
def create_requirements_file():
"""创建requirements.txt文件"""
requirements_content = """# NSFW监控工具依赖包
# 基础科学计算和机器学习
tensorflow>=2.8.0
numpy>=1.21.0
# NSFW内容检测
nsfwpy>=0.1.4.1,<0.1.4.4
# 图像处理
Pillow>=9.0.0
# 系统托盘图标
pystray>=0.19.0
# 打包工具
PyInstaller>=4.9
# Windows特定依赖(可选)
pywin32>=303; sys_platform == "win32"
# 其他实用工具
requests>=2.25.0
"""
try:
with open("requirements.txt", "w", encoding="utf-8") as f:
f.write(requirements_content)
print("requirements.txt 文件已创建")
return True
except Exception as e:
print(f"创建requirements.txt失败: {e}")
return False
def create_setup_script():
"""创建批处理安装脚本"""
batch_content = """@echo off
chcp 65001 >nul
echo ========================================
echo NSFW监控工具 - 自动安装脚本
echo ========================================
echo.
:: 检查Python是否安装
python --version >nul 2>&1
if errorlevel 1 (
echo 错误: 未找到Python,请先安装Python 3.7+
pause
exit /b 1
)
echo 正在创建虚拟环境...
python -m venv venv
if errorlevel 1 (
echo 错误: 虚拟环境创建失败
pause
exit /b 1
)
echo 正在激活虚拟环境...
call venv\\Scripts\\activate.bat
echo 正在升级pip...
python -m pip install --upgrade pip
echo 正在安装依赖包...
pip install -r requirements.txt
if errorlevel 1 (
echo 错误: 依赖包安装失败
pause
exit /b 1
)
echo.
echo ========================================
echo 安装完成!
echo ========================================
echo.
echo 使用方法:
echo 1. 运行: python main.py
echo 2. 或运行: python main.py --setup-only
echo.
pause
"""
try:
with open("setup.bat", "w", encoding="utf-8") as f:
f.write(batch_content)
print("setup.bat 文件已创建")
return True
except Exception as e:
print(f"创建setup.bat失败: {e}")
return False
def create_unix_setup_script():
"""创建Unix/Linux安装脚本"""
bash_content = """#!/bin/bash
echo "========================================"
echo " NSFW监控工具 - 自动安装脚本"
echo "========================================"
echo
# 检查Python是否安装
if ! command -v python3 &> /dev/null; then
echo "错误: 未找到Python3,请先安装Python 3.7+"
exit 1
fi
echo "正在创建虚拟环境..."
python3 -m venv venv
if [ $? -ne 0 ]; then
echo "错误: 虚拟环境创建失败"
exit 1
fi
echo "正在激活虚拟环境..."
source venv/bin/activate
echo "正在升级pip..."
python -m pip install --upgrade pip
echo "正在安装依赖包..."
pip install -r requirements.txt
if [ $? -ne 0 ]; then
echo "错误: 依赖包安装失败"
exit 1
fi
echo
echo "========================================"
echo " 安装完成!"
echo "========================================"
echo
echo "使用方法:"
echo "1. 运行: python main.py"
echo "2. 或运行: python main.py --setup-only"
echo
"""
try:
with open("setup.sh", "w", encoding="utf-8") as f:
f.write(bash_content)
os.chmod("setup.sh", 0o755) # 设置可执行权限
print("setup.sh 文件已创建")
return True
except Exception as e:
print(f"创建setup.sh失败: {e}")
return False
def create_pyinstaller_spec():
"""创建PyInstaller配置文件"""
spec_content = """# -*- mode: python ; coding: utf-8 -*-
import sys
import os
block_cipher = None
# 获取当前脚本所在目录
script_dir = os.path.abspath(SPECPATH)
a = Analysis(
['main.py'],
pathex=[script_dir],
binaries=[],
datas=[],
hiddenimports=[
'tensorflow',
'nsfwpy',
'PIL',
'pystray',
'win32api',
'win32gui',
'win32con'
],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='NSFWMonitor',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
icon=None # 可以添加图标文件路径
)
"""
try:
with open("main.spec", "w", encoding="utf-8") as f:
f.write(spec_content)
print("main.spec 文件已创建")
return True
except Exception as e:
print(f"创建main.spec失败: {e}")
return False
def create_project_files():
"""创建项目辅助文件"""
files_created = []
print("正在创建项目辅助文件...")
# 创建requirements.txt
if create_requirements_file():
files_created.append("requirements.txt")
# 创建Windows批处理脚本
if os.name == 'nt':
if create_setup_script():
files_created.append("setup.bat")
else:
if create_unix_setup_script():
files_created.append("setup.sh")
# 创建PyInstaller配置文件
if create_pyinstaller_spec():
files_created.append("main.spec")
if files_created:
print(f"成功创建文件: {', '.join(files_created)}")
return len(files_created) > 0
def main():
"""主函数"""
try:
# 检查requirements.txt文件是否存在,如果不存在则创建
if not os.path.exists("requirements.txt"):
logger.info("requirements.txt 文件不存在,正在创建...")
create_requirements_file()
logger.info("已自动创建 requirements.txt 文件")
# 创建项目辅助文件(如果需要)
project_files_created = create_project_files()
if project_files_created:
logger.info("项目辅助文件创建完成")
# 检查命令行参数
parser = argparse.ArgumentParser(description="NSFW监控工具")
parser.add_argument("--setup-only", action="store_true",
help="仅执行环境设置和打包")
parser.add_argument("--mirror-url", type=str,
help="设置时下载指定镜像库")
parser.add_argument("--create-exe", action="store_true",
help="创建可执行文件")
parser.add_argument("--package-dir", type=str,
help="指定包输出目录")
parser.add_argument("--create-files", action="store_true",
help="创建项目辅助文件")
args = parser.parse_args()
# 如果只是创建文件
if args.create_files:
logger.info("项目辅助文件创建完成!")
return
# 初始化管理器
config = Config()
venv_manager = VirtualEnvironmentManager(config.project_dir)
nsfw_detector = NSFWDetector()
# 检查Python可用性
if not venv_manager.check_python_availability():
logger.error("Python不可用,请确保Python已正确安装")
return
logger.info("NSFW监控工具启动中...")
# 如果是设置模式
if args.setup_only:
logger.info("=== 开始环境设置 ===")
# 创建虚拟环境
if not os.path.exists(config.venv_dir):
if not venv_manager.create_virtual_environment():
logger.error("虚拟环境创建失败")
return
else:
logger.info("虚拟环境已存在,跳过创建")
# 确保python_path和pip_path已设置
if venv_manager.python_path is None or not os.path.exists(venv_manager.python_path):
if os.name == 'nt': # Windows
venv_manager.python_path = os.path.join(config.venv_dir, "Scripts", "python.exe")
venv_manager.pip_path = os.path.join(config.venv_dir, "Scripts", "pip.exe")
else: # Unix/Linux/macOS
venv_manager.python_path = os.path.join(config.venv_dir, "bin", "python")
venv_manager.pip_path = os.path.join(config.venv_dir, "bin", "pip")
# 下载镜像库
mirror_url = args.mirror_url or config.default_mirror_url
logger.info(f"开始下载镜像库: {mirror_url}")
venv_manager.download_mirror_repo(mirror_url)
# 安装依赖
if not venv_manager.install_dependencies_in_venv():
logger.warning("依赖安装可能不完整,但将继续执行")
# 创建分发包
package_dir = args.package_dir or config.dist_package_dir
venv_manager.create_distribution_package(package_dir)
logger.info("=== 环境设置完成 ===")
logger.info("可以运行以下命令启动监控:")
logger.info(" python main.py")
return
# 如果是创建可执行文件模式
if args.create_exe:
logger.info("=== 开始创建可执行文件 ===")
# 确保虚拟环境存在
if not os.path.exists(config.venv_dir):
if not venv_manager.create_virtual_environment():
logger.error("虚拟环境创建失败,无法创建可执行文件")
return
venv_manager.install_dependencies_in_venv()
else:
# 如果虚拟环境已存在,确保python_path已设置
if venv_manager.python_path is None or not os.path.exists(venv_manager.python_path):
if os.name == 'nt': # Windows
venv_manager.python_path = os.path.join(config.venv_dir, "Scripts", "python.exe")
venv_manager.pip_path = os.path.join(config.venv_dir, "Scripts", "pip.exe")
else: # Unix/Linux/macOS
venv_manager.python_path = os.path.join(config.venv_dir, "bin", "python")
venv_manager.pip_path = os.path.join(config.venv_dir, "bin", "pip")
# 检查python_path是否有效
if venv_manager.python_path is None or not os.path.exists(venv_manager.python_path):
logger.error("Python路径无效,无法创建可执行文件")
return
# 安装PyInstaller(如果尚未安装)
logger.info("正在确保PyInstaller已安装...")
try:
subprocess.run([venv_manager.pip_path, "install", "pyinstaller"], check=True)
logger.info("PyInstaller安装成功")
except subprocess.CalledProcessError as e:
logger.error(f"PyInstaller安装失败: {e}")
return
# 创建可执行包(带控制台输出,便于调试)
if not venv_manager.create_executable_package(console=True):
logger.error("可执行包创建失败")
return
logger.info("=== 可执行文件创建完成 ===")
return
# 正常监控模式 - 同时启动文件监控和截图监控
logger.info("正在初始化NSFW检测器...")
if not nsfw_detector.initialize():
logger.warning("NSFW检测器初始化失败,但程序将继续运行")
# 创建文件监控器
file_monitor = FileMonitor(nsfw_detector, CONFIG)
logger.info("文件监控器已初始化")
# 创建截图监控器
screenshot_monitor = ScreenshotMonitor(nsfw_detector, CONFIG)
# 启动监控服务
logger.info("NSFW监控工具启动成功!")
logger.info("监控功能已激活,正在实时检测NSFW内容...")
logger.info("监控功能:")
logger.info(" - 📁 文件监控(检测文件变化)")
logger.info(" - 📱 剪贴板截图监控")
logger.info(" - 🖥️ 屏幕捕获监控")
logger.info(" - 🪟 Windows活动窗口监控")
logger.info(" - 🚨 绝对置顶警告弹窗")
logger.info("按 Ctrl+C 停止监控")
# 启动文件监控
file_monitor.start_monitoring()
# 启动实时截图监控
screenshot_monitor.start_monitoring()
# 主监控循环
try:
stats_interval = 30 # 每30秒报告一次统计
last_stats_time = time.time()
while True:
current_time = time.time()
# 定期显示统计信息
if current_time - last_stats_time >= stats_interval:
stats = screenshot_monitor.get_stats()
logger.info(f"截图监控状态: {'运行中' if stats['monitoring_active'] else '已停止'}")
logger.info(f"已检测截图数量: {stats['total_screenshots']}")
logger.info(f"临时目录: {stats['temp_dir']}")
last_stats_time = current_time
# 短暂休眠避免CPU占用过高
time.sleep(1.0)
except KeyboardInterrupt:
logger.info("正在停止监控...")
screenshot_monitor.stop_monitoring()
file_monitor.stop_monitoring()
# 显示最终统计
final_stats = screenshot_monitor.get_stats()
logger.info("=== 最终监控报告 ===")
logger.info(f"监控时长: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"检测截图总数: {final_stats['total_screenshots']}")
logger.info(f"监控状态: 已停止")
logger.info("监控已停止")
except Exception as e:
logger.error(f"程序运行出错: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
try:
# 检查命令行参数
if len(sys.argv) > 1:
if sys.argv[1] in ['--help', '-h']:
print_usage_info()
sys.exit(0)
elif sys.argv[1] == '--create-files':
create_project_files()
print("项目文件创建完成!")
sys.exit(0)
# 运行主程序
main()
except KeyboardInterrupt:
print("\n程序被用户中断")
except Exception as e:
print(f"应用程序发生错误: {e}")
import traceback
traceback.print_exc()
# 尝试写入错误日志
try:
import time
with open('error_log.txt', 'a', encoding='utf-8') as f:
f.write(f"\n错误时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"错误信息: {str(e)}\n")
traceback.print_exc(file=f)
f.write("-" * 50 + "\n")
print("错误详情已写入error_log.txt")
except:
print("无法写入错误日志")
input("按回车键退出...")
finally:
print("应用程序已退出")