[Python] 纯文本查看 复制代码
import os
import sys
import io
import socket
import mimetypes
import threading
import time
import queue
import shutil
import re
import zipfile # 新增:用于打包下载
from pathlib import Path
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn
import urllib.parse
import base64
import platform
import subprocess
from email.header import Header
import winreg
import segno
import uuid
import html
import json
from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, \
QPushButton, QTextEdit, QFileDialog, QMessageBox, QSystemTrayIcon, QMenu, QAction, QSplitter
from PyQt5.QtCore import Qt, QSettings, QTimer, pyqtSignal, QSize, QPoint
from PyQt5.QtGui import QIcon, QPixmap, QFont
IS_FROZEN = getattr(sys, 'frozen', False) or "__compiled__" in globals()
DEFAULT_PORT = 5995
DEFAULT_SHARE_PATH = str(Path.home() / "Downloads")
server_instance = None
ICON_BASE64 = "iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAAAXNSR0IArs4c6QAABGZJREFUWEe9V0mMFGUU/t5fVV203UxY3BhmRhlAvYwLonNyLgY7BoIHMTFcEBxITNR4MS5w5aDRuEQ9iHLzhAcvkhghLkziTNAElSjMkNBmgLA1E0en9/qfeX9V9fRS1QtEXtLp1F/1/ve97fvfTwBw5sCGVx1Hv81gELQs1URWSBM08wxr/djw7pOXGj64wQf689NHMymn+jUUWxAAxE1bKjADslzR+tjwrl/HbtBmgzqdPbjxokX6DibfcyLy/yFwAhEEAgIEjysVMLPAkk+b4UaBa9wLCBW1WMsefIit1G0gJ+WbVLKFv61RZB/Q4rP/3hhffNXoVbBDg+GaMz5qruZRXbgMyn7+IC8f24++dU/WqUX5EeVrvYlIc7HZ+ufMN8j9+CYo91WGk8Ob4K64VyLru30TpHJtGgtnj4D01Hg3afx/ILF02OSeLgBEhUXU4tY74Q3TpUIANynuLbgkAlN7OLacYxwxtVLXsp38jX9vAOxmsOm9tiJGzW/lw6D0WnCiD1S+BuSOQ+XPd1KPfk9dAgiNY2gb1O2j8PJXoAuXYKf6Qem74J3YByrmDIsSdXZmEU0PALwVG2CvH0dh4iW4lAdZwtwEXvUE1EAGXJoDewXQqfdAPpt1IV0AMPlmDb7vRVQuTMCZ+xnKTghnm5SI17p/KzQT1NI1IPKA0x/51B3Q+nXXQBh6rRSc0U9QPPos3PTy1vajqgEJ2MDGD1E++S7s4nko9gzQjgAIls+CTaKl6JaNQA1sBqWGUJ14DnZCzoxmkSj5vMCDT0H1Z6AXZsEXDkPN/dYJwDhDTuImEUDV5ACckddQmf0WlewhLLEA5bgxBCRHiUTBQ6lUhr3mGVirM9B/vAOVn40B0aYGmDXokfdRmv4C9tVjsBJiWIqrA2lJDZjjTkHfOgZ199Pg4y+DqNVJc8BH8oB433cPaHAb9C9vwFmytEMuo7Ii8xWAkb3Q2UOw/p2OAN8GAK/eDK+ch3XxCMi2u2iplhyaWHmDW6DLediXjwYRrP+uLYAt8MoLsK5810U7RZSltCgz9NBWeMV5OFe/7xHAnZvg2X2wzn3ZI7stgmGtoYe3w/s7i8TcT72lwIMDa/QDeFMvwOqJXn0A0kUeFOzRj1H4YQeSyaj2jUmBKEu967U7gVtWgX/fbwZSUnVcYQimiTvkUeiZAU8zrAf2Qs//Bcx8BstN9hAB+VQ8IECN7AMnVkLPHDB3BjOINnRi02BiACjY65+HLuZQmXwdbjoddFFzC5sIRBORfxnQYFgouevg3v+KiYJQDRnPQyT+SuMgzyiceAtq/hRc1wYp4ZAoCVMgAY8bzOS+oBmsq9C64oc9jH54iZHZzkzbfvhBCoosKEsBQkCx80Y9gOCWUctsy9kQXI9MhbUhRONIS55iAlBfhFJg9VEwALo919shCmwHN69aAUkx+RNROJZ3MRz3zofxGjUAkzslddchoVK99705Id1ChcOPB6dGu4OuU9J798AvMcJ/PwkXZ+WN+AAAAAAASUVORK5CYII="
SESSIONS_STORE = {}
mimetypes.init()
class FileShareHandler(BaseHTTPRequestHandler):
_open_cache = {}
_open_lock = threading.Lock()
def __init__(self, *args, **kwargs):
self.shared_path = kwargs.pop('shared_path', None)
self.current_path = kwargs.pop('current_path', None) or self.shared_path
self.access_logger = kwargs.pop('access_logger', None)
self.password = kwargs.pop('password', None)
self.admin_password = kwargs.pop('admin_password', None)
super().__init__(*args, **kwargs)
def log_message(self, format, *args):
pass
# --- 权限验证 ---
def get_token_from_cookie(self):
cookie_header = self.headers.get('Cookie')
if cookie_header:
cookies = dict(x.strip().split('=', 1) for x in cookie_header.split(';') if '=' in x)
return cookies.get('auth_token')
return None
def check_auth(self):
if not self.password:
return True
token = self.get_token_from_cookie()
if token and token in SESSIONS_STORE:
return True
return False
def check_admin_auth(self):
if not self.admin_password:
return True
token = self.get_token_from_cookie()
if token and token in SESSIONS_STORE:
if SESSIONS_STORE[token].get('is_admin', False):
return True
return False
# --- 页面响应 ---
def send_login_page(self, error_msg=""):
html_content = '''
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>验证 - 文件共享</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<style>
body { font-family: Arial, sans-serif; display: flex; justify-content: center; align-items: center; height: 100vh; margin: 0; background: #f0f2f5; }
.login-box { background: white; padding: 30px; border-radius: 8px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); width: 100%%; max-width: 320px; text-align: center; }
input { width: 100%%; padding: 10px; margin: 10px 0; border: 1px solid #ddd; border-radius: 4px; box-sizing: border-box; }
button { width: 100%%; padding: 10px; background: #4CAF50; color: white; border: none; border-radius: 4px; cursor: pointer; font-size: 16px; }
button:hover { background: #45a049; }
.error { color: red; font-size: 14px; margin-bottom: 10px; }
</style>
</head>
<body>
<div class="login-box">
<h2>🔒 访问验证</h2>
<div class="error">%s</div>
<form action="/login" method="POST">
<input type="password" name="password" placeholder="请输入访问密码" required autofocus>
<button type="submit">进入</button>
</form>
</div>
</body>
</html>
''' % error_msg
self.send_response(200)
self.send_header('Content-Type', 'text/html; charset=utf-8')
self.end_headers()
self.wfile.write(html_content.encode('utf-8'))
def do_GET(self):
if self.access_logger:
self.access_logger(self.client_address[0], self.requestline)
try:
parsed_path = urllib.parse.urlparse(self.path)
query_params = urllib.parse.parse_qs(parsed_path.query)
if parsed_path.path == '/login':
self.send_login_page()
return
if not self.check_auth():
self.send_response(302)
self.send_header('Location', '/login')
self.end_headers()
return
# 新增:检查当前是否拥有管理权限 (用于前端判断是否弹窗)
if parsed_path.path == '/is_admin':
is_admin = self.check_admin_auth()
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({'is_admin': is_admin}).encode('utf-8'))
return
if parsed_path.path == '/download':
filename = query_params.get('file', [''])[0]
current_path = query_params.get('path', [self.shared_path])[0]
if filename:
self.download_file(filename, current_path)
return
elif parsed_path.path == '/check_exists':
filename = query_params.get('file', [''])[0]
current_path = query_params.get('path', [self.shared_path])[0]
self.check_file_exists(filename, current_path)
return
elif parsed_path.path == '/open':
filename = query_params.get('file', [''])[0]
current_path = query_params.get('path', [self.shared_path])[0]
if filename:
self.open_file(filename, current_path)
return
elif parsed_path.path == '/delete':
if not self.check_admin_auth():
self.send_error(401, "Admin Required")
return
filename = query_params.get('file', [''])[0]
current_path = query_params.get('path', [self.shared_path])[0]
if filename:
self.delete_file(filename, current_path)
return
elif parsed_path.path == '/dir':
dir_path = query_params.get('path', [self.shared_path])[0]
self.current_path = urllib.parse.unquote(dir_path)
self.send_main_page()
return
elif parsed_path.path == '/' or parsed_path.path == '':
self.current_path = self.shared_path
self.send_main_page()
return
else:
self.send_error(404, "Not Found")
except Exception as e:
self.send_error(500, "Server error: {0}".format(str(e)))
def do_POST(self):
if self.access_logger:
self.access_logger(self.client_address[0], self.requestline)
try:
parsed_path = urllib.parse.urlparse(self.path)
if parsed_path.path == '/login':
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length).decode('utf-8')
post_params = urllib.parse.parse_qs(body)
input_password = post_params.get('password', [''])[0]
if input_password == self.password:
token = str(uuid.uuid4())
SESSIONS_STORE[token] = {'is_admin': False, 'create_time': time.time()}
self.send_response(302)
self.send_header('Set-Cookie', f'auth_token={token}; Path=/; Max-Age=86400')
self.send_header('Location', '/')
self.end_headers()
else:
self.send_login_page("密码错误,请重试。")
return
# --- 打包下载接口 (无需Admin权限,但需Basic权限) ---
if parsed_path.path == '/zip':
if not self.check_auth():
print("[ERROR 403] 未登录尝试下载")
self.send_error(403, "Forbidden")
return
# 读取表单数据
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length).decode('utf-8')
post_params = urllib.parse.parse_qs(body)
# 获取路径和文件列表
current_path = post_params.get('path', [self.shared_path])[0]
files = post_params.get('files[]', []) # 可能是多个
# 如果没有 files[] 参数,检查是否有单个 file 参数 (下载单文件夹情况)
if not files:
single_file = post_params.get('file', [])
if single_file:
files = single_file
self.handle_zip_download(current_path, files)
return
if parsed_path.path == '/verify_admin':
if not self.check_auth():
self.send_error(403, "Forbidden")
return
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length).decode('utf-8')
try:
data = json.loads(body)
input_admin_pwd = data.get('password', '')
except:
input_admin_pwd = ""
if input_admin_pwd == self.admin_password:
token = self.get_token_from_cookie()
if token and token in SESSIONS_STORE:
SESSIONS_STORE[token]['is_admin'] = True
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(b'{"success": true}')
else:
self.send_error(403, "Session Invalid")
else:
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(b'{"success": false, "message": "Wrong password"}')
return
if not self.check_auth():
self.send_error(403, "Forbidden")
return
if parsed_path.path in ['/upload', '/rename', '/delete', '/mkdir']:
if not self.check_admin_auth():
self.send_response(401)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(b'{"error": "admin_required"}')
return
query_params = urllib.parse.parse_qs(parsed_path.query)
if parsed_path.path == '/upload':
current_path = query_params.get('path', [self.shared_path])[0]
offset = query_params.get('offset', [None])[0]
if offset is not None:
filename = query_params.get('name', [''])[0]
self.handle_chunk_upload(current_path, filename, int(offset))
else:
self.handle_upload(current_path)
return
elif parsed_path.path == '/rename':
old_name = query_params.get('old', [''])[0]
new_name = query_params.get('new', [''])[0]
current_path = query_params.get('path', [self.shared_path])[0]
if old_name and new_name:
self.rename_file(old_name, new_name, current_path)
else:
self.send_error(400, "Bad Request")
return
elif parsed_path.path == '/delete':
filename = query_params.get('file', [''])[0]
current_path = query_params.get('path', [self.shared_path])[0]
if filename:
self.delete_file(filename, current_path)
return
elif parsed_path.path == '/mkdir':
dirname = query_params.get('name', [''])[0]
current_path = query_params.get('path', [self.shared_path])[0]
if dirname:
self.create_directory(dirname, current_path)
else:
self.send_error(400, "Bad Request")
return
else:
self.send_error(404, "Not Found")
except Exception as e:
self.send_error(500, "Error: {0}".format(str(e)))
def send_main_page(self):
try:
current_path = Path(self.current_path).resolve()
shared_path = Path(self.shared_path).resolve()
if shared_path not in current_path.parents and current_path != shared_path:
current_path = shared_path
breadcrumb_html = self.get_breadcrumb(current_path)
files = []
dirs = []
try:
for item in current_path.iterdir():
try:
item_info = {
'name': item.name,
'modified': time.strftime('%Y-%m-%d %H:%M', time.localtime(item.stat().st_mtime)),
'path': str(item),
'encoded_name': urllib.parse.quote(item.name),
'safe_display_name': html.escape(item.name)
}
if item.is_file():
item_info['size'] = self.format_size(item.stat().st_size)
files.append(item_info)
else:
item_info['size'] = '-'
dirs.append(item_info)
except OSError:
pass
except Exception as e:
pass
dirs.sort(key=lambda x: x['name'].lower())
files.sort(key=lambda x: x['name'].lower())
current_path_encoded = urllib.parse.quote(str(current_path))
file_list_html = ''
if current_path != shared_path:
parent_path = str(current_path.parent)
file_list_html += '''
<tr>
<td></td>
<td><span class="dir-link">📁 .. (返回上一级)</span></td>
<td class="size-cell">-</td>
<td class="hide-mobile">-</td>
<td></td>
</tr>
'''.format(urllib.parse.quote(parent_path))
for d in dirs:
file_list_html += '''
<tr>
<td class="checkbox-cell"><input type="checkbox" class="file-checkbox" value="{1}" data-type="dir"></td>
<td><span class="dir-link">📁 {1}</span></td>
<td class="size-cell">-</td>
<td class="hide-mobile">{2}</td>
<td class="action-buttons">
<button class="action-btn download-btn">下载</button>
<button class="action-btn rename-btn">重命名</button>
<button class="action-btn delete-btn">删除</button>
</td>
</tr>
'''.format(urllib.parse.quote(d['path']), d['name'], d['modified'], d['encoded_name'],
current_path_encoded)
for f in files:
file_list_html += '''
<tr>
<td class="checkbox-cell"><input type="checkbox" class="file-checkbox" value="{0}" data-type="file"></td>
<td class="file-name">📄 {0}</td>
<td class="size-cell">{1}</td>
<td class="hide-mobile">{2}</td>
<td class="action-buttons">
<button class="action-btn open-btn">打开</button>
<button class="action-btn download-btn">下载</button>
<button class="action-btn rename-btn">重命名</button>
<button class="action-btn delete-btn">删除</button>
</td>
</tr>
'''.format(f['safe_display_name'], f['size'], f['modified'], f['encoded_name'], current_path_encoded)
html_str = '''
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>文件共享</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=1.0, user-scalable=no">
<style>
* {{ box-sizing: border-box; }}
body {{ font-family: "Microsoft YaHei", Arial, sans-serif; margin: 0; padding: 10px; background-color: #f9f9f9; color: #333; }}
.container {{ max-width: 1000px; margin: 0 auto; background: white; padding: 15px; border-radius: 6px; box-shadow: 0 1px 3px rgba(0,0,0,0.1); }}
.header {{ display: flex; justify-content: space-between; align-items: center; border-bottom: 2px solid #4CAF50; padding-bottom: 10px; margin-bottom: 15px; flex-wrap: wrap; gap: 10px; }}
.header h1 {{ margin: 0; font-size: 1.5em; }}
.toolbar {{ display: flex; gap: 8px; flex-wrap: wrap; }}
.btn {{ padding: 6px 12px; border: none; border-radius: 4px; cursor: pointer; font-size: 14px; color: white; }}
.upload-btn {{ background: #2196F3; }}
.mkdir-btn {{ background: #9C27B0; }}
.batch-btn {{ background: #4CAF50; }}
/* 新增:批量删除按钮样式 */
.delete-batch-btn {{ background: #f44336; }}
.delete-batch-btn:hover {{ background: #d32f2f; }}
/* 同时隐藏文件和文件夹的输入控件 */
#fileInput, #folderInput {{ display: none; }}
.breadcrumb {{ margin: 10px 0; padding: 10px; background: #f0f0f0; border-radius: 4px; word-break: break-all; font-size: 14px; border: 1px solid #ddd; }}
.breadcrumb a {{ color: #2196F3; text-decoration: none; }}
table {{ width: 100%; border-collapse: collapse; margin: 10px 0; table-layout: fixed; }}
th, td {{ padding: 10px 8px; vertical-align: middle; border-bottom: 1px solid #eee; word-break: break-all; }}
/* 表头全局样式 */
th {{ background-color: #546E7A; color: white; font-size: 14px; font-weight: normal; text-align: left; }}
tr:hover {{ background-color: #f5f5f5; }}
.checkbox-cell {{ padding: 0; }}
input[type="checkbox"] {{ transform: scale(1.2); cursor: pointer; }}
.dir-link {{ color: #2196F3; cursor: pointer; font-weight: bold; text-decoration: none; }}
.file-name {{ color: #333; }}
td.size-cell {{ font-size: 13px; color: #666; white-space: nowrap; }}
/* --- 桌面端样式 --- */
th:nth-child(1), td:nth-child(1) {{
width: 40px;
text-align: center;
}}
th:nth-child(2), td:nth-child(2) {{
width: auto;
text-align: left;
white-space: normal;
word-break: break-all;
}}
th:nth-child(3), td:nth-child(3) {{
width: 100px;
text-align: left;
}}
th:nth-child(4), td:nth-child(4) {{
width: 160px;
text-align: left;
}}
th:nth-child(5), td:nth-child(5) {{
width: 220px;
text-align: left;
}}
.action-buttons {{ display: flex; gap: 4px; flex-wrap: wrap; justify-content: flex-start; }}
.action-btn {{ padding: 4px 8px; border: none; border-radius: 3px; cursor: pointer; font-size: 12px; color: white; }}
.open-btn {{ background: #ff9800; }}
.download-btn {{ background: #4CAF50; }}
.rename-btn {{ background: #2196F3; }}
.delete-btn {{ background: #f44336; }}
/* --- 手机端样式 --- */
@media (max-width: 768px) {{
body {{ padding: 5px; }}
.container {{ padding: 8px; }}
.header h1 {{ font-size: 1.2em; }}
.toolbar {{ width: 100%; }}
.btn {{ flex: 1; padding: 8px 0; font-size: 13px; }}
.hide-mobile {{ display: none; }}
th:nth-child(1), td:nth-child(1) {{
width: 30px !important;
padding: 8px 2px !important;
text-align: center;
}}
th:nth-child(2), td:nth-child(2) {{
width: auto !important;
white-space: normal !important;
word-break: break-all !important;
padding: 8px 4px !important;
text-align: left !important;
}}
th:nth-child(3), td:nth-child(3) {{
width: 75px !important;
font-size: 12px;
text-align: center !important;
padding: 8px 2px !important;
}}
th:nth-child(3) {{ color: white !important; background-color: #546E7A; }}
td:nth-child(3) {{ color: #888; }}
th:last-child, td:last-child {{
width: 60px !important;
padding: 2px !important;
text-align: center !important;
}}
.action-buttons {{ flex-direction: column; gap: 2px; width: 100%; justify-content: center; }}
.action-btn {{ width: 100%; margin: 1px 0; padding: 5px 0; font-size: 11px; }}
}}
.status {{ padding: 10px; margin: 10px 0; border-radius: 5px; display: none; position: fixed; top: 20px; left: 50%; transform: translateX(-50%); z-index: 1000; min-width: 200px; text-align: center; box-shadow: 0 4px 12px rgba(0,0,0,0.15); }}
.success {{ background: #d4edda; color: #155724; border: 1px solid #c3e6cb; }}
.error {{ background: #f8d7da; color: #721c24; border: 1px solid #f5c6cb; }}
.progress-container {{ display: none; width: 100%; background-color: #e9ecef; border-radius: 4px; margin: 10px 0; overflow: hidden; height: 24px; }}
.progress-bar {{ width: 0%; height: 100%; background-color: #4CAF50; color: white; text-align: center; line-height: 24px; font-size: 13px; white-space: nowrap; }}
.modal-overlay {{ display: none; position: fixed; top: 0; left: 0; width: 100%; height: 100%; background: rgba(0,0,0,0.5); z-index: 2000; justify-content: center; align-items: center; }}
.modal-box {{ background: white; padding: 20px; border-radius: 8px; width: 300px; text-align: center; box-shadow: 0 4px 20px rgba(0,0,0,0.2); }}
.modal-input {{ width: 100%; padding: 8px; margin: 10px 0; border: 1px solid #ccc; border-radius: 4px; box-sizing: border-box; }}
.modal-btns {{ display: flex; justify-content: space-between; margin-top: 10px; }}
.modal-btn {{ width: 48%; padding: 8px; border: none; border-radius: 4px; cursor: pointer; }}
.modal-confirm {{ background: #2196F3; color: white; }}
.modal-cancel {{ background: #ddd; color: #333; }}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>📁 文件共享</h1>
<div class="toolbar">
<button class="btn batch-btn">批量下载</button>
<button class="btn upload-btn">批量上传</button>
<button class="btn delete-batch-btn">批量删除</button>
<button class="btn mkdir-btn">新建文件夹</button>
<button class="btn" style="background-color: #ff9800;">上传文件夹</button>
<input type="file" id="fileInput" multiple>
<input type="file" id="folderInput" webkitdirectory>
</div>
</div>
<div class="breadcrumb">{0}</div>
<div id="status" class="status"></div>
<div id="progress-container" class="progress-container">
<div id="progress-bar" class="progress-bar"></div>
</div>
<table>
<thead>
<tr>
<th><input type="checkbox"></th>
<th>名称</th>
<th class="size-cell">大小</th>
<th class="hide-mobile">修改时间</th>
<th>操作</th>
</tr>
</thead>
<tbody>{1}</tbody>
</table>
</div>
<div id="admin-modal" class="modal-overlay">
<div class="modal-box">
<h3>🔒 需要管理权限</h3>
<p style="font-size:14px; color:#666; margin-bottom:10px;">执行此操作需要验证管理密码</p>
<input type="password" id="admin-pwd-input" class="modal-input" placeholder="输入管理密码">
<div id="admin-modal-msg" style="color:red; font-size:12px; height:16px;"></div>
<div class="modal-btns">
<button class="modal-btn modal-cancel">取消</button>
<button class="modal-btn modal-confirm">确认</button>
</div>
</div>
</div>
<script>
let pendingAction = null;
// 全局变量存储权限状态
let g_isAdmin = false;
// 页面加载时立即检查权限,这样后续点击就是同步的
document.addEventListener('DOMContentLoaded', () => {{
fetch('/is_admin')
.then(r => r.json())
.then(data => {{ g_isAdmin = data.is_admin; }})
.catch(e => console.log(e));
}});
function showStatus(m,t){{
const s=document.getElementById('status');
s.textContent=m; s.className='status '+t; s.style.display='block';
setTimeout(()=>s.style.display='none',3000);
}}
function openAdminModal(actionCallback) {{
pendingAction = actionCallback;
document.getElementById('admin-modal').style.display = 'flex';
document.getElementById('admin-pwd-input').value = '';
document.getElementById('admin-modal-msg').textContent = '';
document.getElementById('admin-pwd-input').focus();
}}
function closeAdminModal() {{
document.getElementById('admin-modal').style.display = 'none';
pendingAction = null;
}}
function submitAdminAuth() {{
const pwd = document.getElementById('admin-pwd-input').value;
fetch('/verify_admin', {{
method: 'POST',
headers: {{'Content-Type': 'application/json'}},
body: JSON.stringify({{password: pwd}})
}})
.then(r => r.json())
.then(data => {{
if(data.success) {{
document.getElementById('admin-modal').style.display = 'none';
// 更新全局状态
g_isAdmin = true;
if(pendingAction) pendingAction();
}} else {{
document.getElementById('admin-modal-msg').textContent = '密码错误';
}}
}})
.catch(e => {{
document.getElementById('admin-modal-msg').textContent = '验证出错';
}});
}}
document.getElementById('admin-pwd-input').addEventListener("keyup", function(event) {{
if (event.key === "Enter") submitAdminAuth();
}});
// 使用预加载的状态进行同步判断
function checkAdminAndUpload(inputId) {{
if (g_isAdmin) {{
// 有权限:同步调用,手机端支持
document.getElementById(inputId).click();
}} else {{
// 无权限:弹出验证框
openAdminModal(() => {{
// 验证成功后的回调
// 注意:此处因为是在 fetch 回调中,无法自动触发 click。
// 所以只能提示用户再次点击。
showStatus('验证成功,请再次点击上传按钮', 'success');
}});
}}
}}
function handleRequest(url, method, body, successMsg, failMsg) {{
const opts = {{ method: method }};
if(body) opts.body = body;
fetch(url, opts).then(r => {{
if(r.status === 401) {{
openAdminModal(() => handleRequest(url, method, body, successMsg, failMsg));
return;
}}
if(r.ok) {{
showStatus(successMsg, 'success');
setTimeout(() => location.reload(), 500);
}} else {{
showStatus(failMsg, 'error');
}}
}}).catch(() => showStatus('网络错误', 'error'));
}}
function toggleAll(source) {{
const checkboxes = document.querySelectorAll('.file-checkbox');
for(let i=0; i<checkboxes.length; i++) {{
checkboxes[i].checked = source.checked;
}}
}}
function downloadBatch(files) {{
if (!files || files.length === 0) {{
alert("请至少选择一个文件或文件夹!");
return;
}}
const form = document.createElement('form');
form.method = 'POST';
form.action = '/zip?v=' + new Date().getTime();
form.style.display = 'none';
const pathInput = document.createElement('input');
pathInput.name = 'path';
pathInput.value = '{2}';
form.appendChild(pathInput);
files.forEach(f => {{
const fileInput = document.createElement('input');
fileInput.name = 'files[]';
fileInput.value = f;
form.appendChild(fileInput);
}});
document.body.appendChild(form);
form.submit();
document.body.removeChild(form);
}}
function downloadSelected() {{
const checkboxes = document.querySelectorAll('.file-checkbox:checked');
const files = [];
checkboxes.forEach((cb) => {{
files.push(cb.value);
}});
downloadBatch(files);
}}
async function deleteSelected() {{
const checkboxes = document.querySelectorAll('.file-checkbox:checked');
if (checkboxes.length === 0) {{
alert("请至少选择一个项目!");
return;
}}
// 1. 权限检查(参考上传逻辑)
// 如果当前不是管理员,先弹窗验证,验证成功后回调自己
if (!g_isAdmin) {{
openAdminModal(() => deleteSelected());
return;
}}
// 2. 确认提示
const confirmMsg = `确定要删除选中的 ${{checkboxes.length}} 个项目吗?\n注意:删除后无法恢复!`;
if (!confirm(confirmMsg)) return;
// 3. 准备进度条
const pBar = document.getElementById('progress-bar');
const pContainer = document.getElementById('progress-container');
pContainer.style.display = 'block';
// 获取当前路径 (Python formatted path)
const path = '{2}';
let successCount = 0;
const total = checkboxes.length;
// 4. 循环删除
for (let i = 0; i < total; i++) {{
const name = checkboxes[i].value;
const encodedName = encodeURIComponent(name);
// 更新进度条文字
pBar.textContent = `正在删除 [${{i + 1}}/${{total}}]: ${{name}}`;
pBar.style.width = Math.round(((i + 1) / total) * 100) + '%';
pBar.style.backgroundColor = '#f44336'; // 红色进度条表示删除
try {{
// 调用删除接口
const res = await fetch(`/delete?file=${{encodedName}}&path=${{path}}`, {{ method: 'POST' }});
if (res.ok) {{
successCount++;
}} else {{
console.error(`Failed to delete ${{name}}: ${{res.status}}`);
}}
}} catch (e) {{
console.error(e);
}}
}}
// 5. 完成后刷新
pBar.style.backgroundColor = '#4CAF50'; // 变绿表示完成
pBar.textContent = `操作完成,成功删除 ${{successCount}}/${{total}} 个`;
setTimeout(() => location.reload(), 1000);
}}
async function handleFileSelect(e) {{
const files = Array.from(e.target.files);
if (!files.length) return;
e.target.value = '';
const path = '{2}';
const totalFiles = files.length;
for (let i = 0; i < totalFiles; i++) {{
const file = files[i];
// 获取相对路径 (文件夹上传时) 或 文件名 (普通上传时)
const relativePath = file.webkitRelativePath || file.name;
// 顺序上传
const success = await uploadFile(file, path, i + 1, totalFiles, relativePath);
if (!success) {{
console.log(`Skipped or failed: ${{relativePath}}`);
}}
}}
setTimeout(() => location.reload(), 500);
}}
async function uploadFile(file, path, index, totalCount, uploadName) {{
const pBar = document.getElementById('progress-bar');
const pContainer = document.getElementById('progress-container');
const chunkSize = 2 * 1024 * 1024;
pContainer.style.display = 'block';
const prefix = totalCount > 1 ? `[${{index}}/${{totalCount}}] ` : '';
pBar.textContent = `${{prefix}}准备上传: ${{uploadName}}`;
pBar.style.width = '0%';
pBar.style.backgroundColor = '#4CAF50';
let uploadedSize = 0;
const total = file.size;
const encodedName = encodeURIComponent(uploadName);
// 1. 检查是否存在
try {{
const res = await fetch(`/check_exists?file=${{encodedName}}&path=${{path}}`);
if (res.ok) {{
const data = await res.json();
uploadedSize = data.size;
if (uploadedSize >= total) {{
console.log(`File ${{uploadName}} already exists, skipping.`);
return true;
}}
}}
}} catch (e) {{
console.log('Check exists failed');
}}
let offset = uploadedSize;
while (offset < total) {{
const end = Math.min(offset + chunkSize, total);
const chunk = file.slice(offset, end);
const percent = Math.round((end / total) * 100);
pBar.style.width = percent + '%';
pBar.textContent = `${{prefix}}上传中 ${{percent}}% (${{uploadName}})`;
try {{
const res = await fetch(`/upload?path=${{path}}&name=${{encodedName}}&offset=${{offset}}`, {{
method: 'POST',
headers: {{ 'Content-Type': 'application/octet-stream' }},
body: chunk
}});
if (res.status === 401) throw new Error("需管理员权限");
if (!res.ok) throw new Error(`Status ${{res.status}}`);
offset = end;
}} catch (e) {{
pBar.style.backgroundColor = '#f44336';
pBar.textContent = `${{prefix}}失败: ${{e.message}}`;
return false;
}}
}}
pBar.textContent = `${{prefix}}完成!`;
return true;
}}
function downloadFile(n,p){{
window.open(`/download?file=${{n}}&path=${{p}}`,'_blank');
}}
function deleteFile(n,p,isDir){{
const nd=decodeURIComponent(n);
const typeName = isDir ? '文件夹' : '文件';
if(confirm('确定要删除'+typeName+' "'+nd+'" 吗? '+ (isDir?'\\n注意:文件夹内的所有内容也会被删除!':''))) {{
handleRequest(`/delete?file=${{n}}&path=${{p}}`, 'POST', null, '删除成功!', '删除失败');
}}
}}
function renameFile(o,p){{
const od=decodeURIComponent(o);
const nn=prompt('请输入新名称:',od);
if(nn&&nn!==od){{
handleRequest(`/rename?old=${{o}}&new=${{encodeURIComponent(nn)}}&path=${{p}}`, 'POST', null, '重命名成功!', '重命名失败');
}}
}}
function createDir(p) {{
const name = prompt('请输入新文件夹名称:');
if (name) {{
handleRequest(`/mkdir?name=${{encodeURIComponent(name)}}&path=${{p}}`, 'POST', null, '创建成功!', '创建失败');
}}
}}
function openFile(n, p) {{
window.open(`/download?file=${{n}}&path=${{p}}&preview=1`, '_blank');
}}
function openDir(p){{ window.location.href=`/dir?path=${{p}}`; }}
</script>
</body>
</html>
'''.format(breadcrumb_html, file_list_html, current_path_encoded)
self.send_response(200)
self.send_header('Content-Type', 'text/html; charset=utf-8')
self.end_headers()
self.wfile.write(html_str.encode('utf-8'))
except Exception as e:
self.send_error(500, "Page error: {0}".format(str(e)))
def get_breadcrumb(self, current_path):
shared_path = Path(self.shared_path).resolve()
current_path = Path(current_path).resolve()
breadcrumb = []
breadcrumb.append('<a href="/dir?path={0}">根目录</a>'.format(urllib.parse.quote(str(shared_path))))
if current_path != shared_path:
try:
rel_parts = current_path.relative_to(shared_path).parts
current_dir = shared_path
for part in rel_parts:
current_dir = current_dir / part
breadcrumb.append('<span>/</span>')
breadcrumb.append(
'<a href="/dir?path={0}">{1}</a>'.format(urllib.parse.quote(str(current_dir)), part))
except ValueError:
pass
return ''.join(breadcrumb)
def handle_zip_download(self, current_path, files):
try:
# 1. 解析路径
current_path = urllib.parse.unquote(current_path)
base_dir = Path(current_path).resolve()
shared_root = Path(self.shared_path).resolve()
# 2. 路径安全检查 (修复 403 的核心)
base_dir_str = str(base_dir)
root_dir_str = str(shared_root)
# Windows 系统不区分大小写,但字符串比较区分。
# 这里统一转为小写进行比对,防止误报 403。
is_safe = False
if platform.system() == 'Windows':
if base_dir_str.lower().startswith(root_dir_str.lower()):
is_safe = True
else:
if base_dir_str.startswith(root_dir_str):
is_safe = True
if not is_safe:
print(f"[ERROR 403] 安全拦截: 请求路径 '{base_dir_str}' 不在共享根目录 '{root_dir_str}' 之内")
self.send_error(403, "Forbidden: Path not allowed")
return
if not files:
self.send_error(400, "No files selected")
return
# 3. 文件名生成逻辑
timestamp = time.strftime('%Y%m%d_%H%M%S')
if len(files) == 1:
# 尝试获取文件原始名称(不含后缀)
raw_name = files[0]
safe_name = Path(raw_name).stem if Path(raw_name).suffix else raw_name
zip_name_str = f"{safe_name}_{timestamp}.zip"
else:
zip_name_str = f"批量下载_{timestamp}.zip"
encoded_filename = urllib.parse.quote(zip_name_str)
# 4. 发送响应头 (包含强力禁缓存)
self.send_response(200)
self.send_header('Content-Type', 'application/zip')
self.send_header('Content-Disposition',
f'attachment; filename="download.zip"; filename*=UTF-8\'\'{encoded_filename}')
self.send_header('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0')
self.send_header('Pragma', 'no-cache')
self.send_header('Expires', '0')
self.end_headers()
# 5. 打包流 (仅存储模式,速度最快)
with zipfile.ZipFile(self.wfile, 'w', zipfile.ZIP_STORED, allowZip64=True) as zf:
for fname in files:
if '%' in fname:
fname = urllib.parse.unquote(fname)
file_path = base_dir / fname
if not file_path.exists():
continue
if file_path.is_file():
zf.write(file_path, fname)
elif file_path.is_dir():
for root, dirs, filenames in os.walk(file_path):
for f in filenames:
full_path = Path(root) / f
try:
relative_path = full_path.relative_to(base_dir)
zf.write(full_path, str(relative_path))
except ValueError:
pass
except Exception as e:
# 捕获连接中断等错误,不让程序崩溃
print(f"[WARN] Zip download interrupted: {e}")
def open_file(self, filename, current_path):
try:
filename = urllib.parse.unquote(filename)
current_path = urllib.parse.unquote(current_path)
file_path = Path(current_path) / filename
if not str(file_path.resolve()).startswith(str(Path(self.shared_path).resolve())):
self.send_error(403, "Access denied")
return
if not file_path.exists() or not file_path.is_file():
self.send_error(404, "File not found")
return
cache_key = str(file_path.resolve())
current_time = time.time()
with FileShareHandler._open_lock:
if cache_key in FileShareHandler._open_cache:
if current_time - FileShareHandler._open_cache[cache_key] < 1.5:
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(b'{"success": true, "message": "Already opening."}')
return
FileShareHandler._open_cache[cache_key] = current_time
for k in list(FileShareHandler._open_cache.keys()):
if current_time - FileShareHandler._open_cache[k] > 10:
del FileShareHandler._open_cache[k]
system = platform.system()
if system == "Windows":
os.startfile(str(file_path))
elif system == "Darwin":
subprocess.Popen(["open", str(file_path)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
else:
subprocess.Popen(["xdg-open", str(file_path)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
start_new_session=True)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(b'{"success": true, "message": "Open command sent."}')
except Exception as e:
self.send_response(500)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(b'{"success": false, "message": "Failed to open file."}')
def check_file_exists(self, filename, current_path):
try:
filename = urllib.parse.unquote(filename)
current_path = urllib.parse.unquote(current_path)
file_path = Path(current_path) / filename
if not str(file_path).startswith(str(Path(self.shared_path).resolve())):
self.send_error(403)
return
size = 0
if file_path.exists() and file_path.is_file():
size = file_path.stat().st_size
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({'size': size}).encode('utf-8'))
except Exception:
self.send_error(500)
def download_file(self, filename, current_path):
try:
filename = urllib.parse.unquote(filename)
current_path = urllib.parse.unquote(current_path)
file_path = Path(current_path) / filename
if not str(file_path).startswith(str(Path(self.shared_path).resolve())):
self.send_error(403, "Access denied")
return
if not file_path.exists():
self.send_error(404, "File not found")
return
if not file_path.is_file():
self.send_error(400, "Not a file")
return
file_size = file_path.stat().st_size
content_type, encoding = mimetypes.guess_type(file_path)
if not content_type:
content_type = 'application/octet-stream'
range_header = self.headers.get('Range')
start = 0
end = file_size - 1
status_code = 200
if range_header:
try:
m = re.search(r'bytes=(\d+)-(\d*)', range_header)
if m:
g1, g2 = m.groups()
start = int(g1)
if g2:
end = int(g2)
if start >= file_size:
self.send_error(416, "Requested Range Not Satisfiable")
self.send_header("Content-Range", f"bytes */{file_size}")
self.end_headers()
return
status_code = 206
except:
pass
length = end - start + 1
self.send_response(status_code)
self.send_header('Content-Type', content_type)
self.send_header('Accept-Ranges', 'bytes')
if status_code == 206:
self.send_header('Content-Range', f'bytes {start}-{end}/{file_size}')
self.send_header('Content-Length', str(length))
parsed_url = urllib.parse.urlparse(self.path)
query_params = urllib.parse.parse_qs(parsed_url.query)
is_preview = query_params.get('preview', ['0'])[0] == '1'
disposition_type = 'inline' if is_preview else 'attachment'
encoded_filename = Header(filename, 'utf-8').encode()
self.send_header(
'Content-Disposition',
f'{disposition_type}; filename="{encoded_filename}"; filename*=UTF-8\'\'{urllib.parse.quote(filename)}'
)
if encoding:
self.send_header('Content-Encoding', encoding)
self.end_headers()
with open(file_path, 'rb') as f:
f.seek(start)
bytes_sent = 0
chunk_size = 64 * 1024
while bytes_sent < length:
read_len = min(chunk_size, length - bytes_sent)
chunk = f.read(read_len)
if not chunk:
break
try:
self.wfile.write(chunk)
bytes_sent += len(chunk)
except (BrokenPipeError, ConnectionResetError):
break
except Exception as e:
pass
def delete_file(self, filename, current_path):
try:
filename = urllib.parse.unquote(filename)
current_path = urllib.parse.unquote(current_path)
target_path = Path(current_path) / filename
if not str(target_path).startswith(str(Path(self.shared_path).resolve())):
self.send_error(403, "Access denied")
return
if not target_path.exists():
self.send_error(404, "Not found")
return
if target_path.is_file():
target_path.unlink()
elif target_path.is_dir():
shutil.rmtree(target_path)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(b'{"success": true}')
except Exception as e:
self.send_error(500, "Delete error: {0}".format(str(e)))
def rename_file(self, old_name, new_name, current_path):
try:
old_name = urllib.parse.unquote(old_name)
new_name = urllib.parse.unquote(new_name)
current_path = urllib.parse.unquote(current_path)
old_path = Path(current_path) / old_name
new_path = Path(current_path) / new_name
if not str(old_path).startswith(str(Path(self.shared_path).resolve())) or \
not str(new_path).startswith(str(Path(self.shared_path).resolve())):
self.send_error(403, "Access denied")
return
if not old_path.exists():
self.send_error(404, "File not found")
return
if new_path.exists():
self.send_error(409, "Target already exists")
return
old_path.rename(new_path)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(b'{"success": true}')
except Exception as e:
self.send_error(500, "Rename error: {0}".format(str(e)))
def create_directory(self, dirname, current_path):
try:
dirname = urllib.parse.unquote(dirname)
current_path = urllib.parse.unquote(current_path)
new_dir_path = Path(current_path) / dirname
if not str(new_dir_path).startswith(str(Path(self.shared_path).resolve())):
self.send_error(403, "Access denied")
return
if new_dir_path.exists():
self.send_error(409, "Directory already exists")
return
new_dir_path.mkdir()
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(b'{"success": true}')
except Exception as e:
self.send_error(500, "Mkdir error: {0}".format(str(e)))
def handle_chunk_upload(self, current_path, filename, offset):
try:
current_path = urllib.parse.unquote(current_path)
filename = urllib.parse.unquote(filename)
filename = filename.lstrip('/').lstrip('\\')
save_path = Path(current_path) / filename
if not str(save_path).startswith(str(Path(self.shared_path).resolve())):
self.send_error(403, "Access denied")
return
# --- 自动创建父级目录 ---
try:
if not save_path.parent.exists():
save_path.parent.mkdir(parents=True, exist_ok=True)
except Exception as e:
print(f"Create dir error: {e}")
self.send_error(500, "Failed to create directory")
return
if offset == 0:
with open(save_path, 'wb') as f:
pass
mode = 'r+b' if save_path.exists() else 'wb'
with open(save_path, mode) as f:
f.seek(offset)
chunk_len = int(self.headers.get('Content-Length', 0))
if chunk_len > 0:
bytes_left = chunk_len
read_block = 65536
while bytes_left > 0:
chunk = self.rfile.read(min(read_block, bytes_left))
if not chunk: break
f.write(chunk)
bytes_left -= len(chunk)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(b'{"success": true}')
except Exception as e:
self.send_error(500, f"Chunk upload failed: {str(e)}")
def handle_upload(self, current_path):
save_path = None
try:
current_path = urllib.parse.unquote(current_path)
current_path = Path(current_path).resolve()
shared_path = Path(self.shared_path).resolve()
if not str(current_path).startswith(str(shared_path)):
current_path = shared_path
content_type = self.headers.get('Content-Type', '')
if 'boundary=' not in content_type:
self.send_error(400, "Invalid content type")
return
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_error(400, "Empty content")
return
boundary = content_type.split('boundary=')[1].strip()
if boundary.startswith('"') and boundary.endswith('"'):
boundary = boundary[1:-1]
boundary = boundary.encode('utf-8')
boundary_start = b'--' + boundary
bytes_read = 0
filename = None
while True:
line = self.rfile.readline()
bytes_read += len(line)
if not line: break
if b'filename="' in line:
line_str = line.decode('utf-8', errors='ignore')
filename = line_str.split('filename="')[1].split('"')[0]
if line == b'\r\n':
break
if not filename:
self.send_error(400, "Can't find filename")
return
filename = filename.lstrip('/').lstrip('\\') # 去除开头的斜杠
save_path = current_path / filename
# 再次安全检查
if not str(save_path.resolve()).startswith(str(Path(self.shared_path).resolve())):
self.send_error(403, "Forbidden Path")
return
# 自动创建父目录
if not save_path.parent.exists():
save_path.parent.mkdir(parents=True, exist_ok=True)
counter = 1
while save_path.exists():
name, ext = os.path.splitext(filename)
new_name = "{0}_{1}{2}".format(name, counter, ext)
save_path = current_path / new_name
counter += 1
chunk_size = 65536
remain_bytes = content_length - bytes_read
with open(save_path, 'wb') as f:
read_len = min(chunk_size, remain_bytes)
prev_chunk = self.rfile.read(read_len)
remain_bytes -= len(prev_chunk)
if not prev_chunk and remain_bytes > 0:
raise ConnectionError("Upload aborted by client")
while True:
if remain_bytes <= 0:
if boundary_start in prev_chunk:
end_pos = prev_chunk.find(boundary_start)
real_data = prev_chunk[:end_pos]
if real_data.endswith(b'\r\n'):
real_data = real_data[:-2]
f.write(real_data)
else:
f.write(prev_chunk)
break
read_len = min(chunk_size, remain_bytes)
chunk = self.rfile.read(read_len)
remain_bytes -= len(chunk)
if not chunk:
if remain_bytes > 0:
raise ConnectionError("Upload aborted by client")
f.write(prev_chunk)
break
if boundary_start in chunk:
combined = prev_chunk + chunk
end_pos = combined.find(boundary_start)
real_data = combined[:end_pos]
if real_data.endswith(b'\r\n'):
real_data = real_data[:-2]
f.write(real_data)
break
else:
f.write(prev_chunk)
prev_chunk = chunk
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(b'{"success": true}')
except Exception as e:
print(f"Upload interrupted/error: {e}")
if save_path and save_path.exists():
try:
time.sleep(0.1)
os.remove(save_path)
except Exception:
pass
try:
self.send_error(500, f"Upload aborted: {e}")
except:
pass
def format_size(self, size):
for unit in ['B', 'KB', 'MB', 'GB']:
if size < 1024.0:
return "{0:.1f} {1}".format(size, unit)
size /= 1024.0
return "{0:.1f} TB".format(size)
class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
daemon_threads = True
def __init__(self, server_address, RequestHandlerClass):
self.allow_reuse_address = True
super().__init__(server_address, RequestHandlerClass)
class FastBindHTTPServer(ThreadingMixIn, HTTPServer):
address_family = socket.AF_INET
def server_bind(self):
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.server_address)
class FileSharer:
def __init__(self):
self.server = None
self.server_thread = None
self.shared_path = None
self.password = None
self.admin_password = None
self.port = DEFAULT_PORT
self.is_running = False
self.status_callback = None
self.log_queue: queue.Queue = queue.Queue()
def set_status_callback(self, callback):
self.status_callback = callback
def update_status(self, message, is_running):
self.is_running = is_running
if self.status_callback:
self.status_callback(message, is_running)
def _allow_firewall_access(self):
if platform.system() != 'Windows':
return
try:
startupinfo = None
if platform.system() == 'Windows':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
result = subprocess.run(
[
'netsh', 'advfirewall', 'firewall', 'add', 'rule',
'name=FileShareTool_{0}'.format(self.port),
'dir=in', 'action=allow', 'protocol=TCP',
'localport={0}'.format(self.port),
'remoteip=localsubnet', 'profile=private',
'enable=yes'
],
capture_output=True,
text=True,
errors='replace',
check=False,
startupinfo=startupinfo,
creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == 'win32' else 0
)
if result.returncode == 0:
print("已添加新防火墙规则 {0} 端口".format(self.port))
elif result.returncode == 1:
print("提示: 自动添加防火墙规则失败(权限不足)。")
else:
print(f"防火墙规则添加失败,错误代码: {result.returncode}")
except Exception as e:
print("添加防火墙规则失败(未知错误): {0}".format(str(e)))
def stop(self):
if not self.is_running or not self.server:
return
self.update_status("已停止", False)
def cleanup_worker():
try:
self.server.shutdown()
self.server.server_close()
except Exception as e:
print(f"后台清理服务器时出错: {e}")
finally:
self.server = None
global server_instance
server_instance = None
threading.Thread(target=cleanup_worker, daemon=True).start()
def get_all_local_ips(self):
ips = []
try:
for interface in socket.if_nameindex():
if interface[1] == 'lo':
continue
try:
addrs = socket.getaddrinfo(socket.gethostname(), None, socket.AF_INET, 0, socket.SOL_TCP)
for addr in addrs:
ip = addr[4][0]
if ip not in ips and not ip.startswith('127.'):
ips.append(ip)
except:
continue
if not ips:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ips.append(s.getsockname()[0])
s.close()
except:
ips.append('127.0.0.1')
return ips if ips else ['127.0.0.1']
def start(self, path=None, port=None, password=None, admin_password=None):
t_total_start = time.time()
if self.is_running:
return False, "服务已在运行中。"
self.update_status("正在启动...", True)
self.shared_path = path if path and Path(path).exists() else DEFAULT_SHARE_PATH
self.port = port if port and isinstance(port, int) and 1 <= port <= 65535 else DEFAULT_PORT
self.password = password
self.admin_password = admin_password
SESSIONS_STORE.clear()
if not Path(self.shared_path).exists():
self.update_status("启动失败", False)
return False, "路径不存在"
def access_logger(client_ip, request_line):
self.log_queue.put((time.strftime('%H:%M:%S'), client_ip, request_line))
try:
handler_factory = lambda *args: FileShareHandler(*args,
shared_path=self.shared_path,
access_logger=access_logger,
password=self.password,
admin_password=self.admin_password)
print(f"[DEBUG] 准备创建服务器...")
t_server = time.time()
self.server = FastBindHTTPServer(('', self.port), handler_factory)
print(f"[DEBUG] HTTPServer 创建耗时: {time.time() - t_server:.4f}s")
# 会被杀毒软件误报为病毒,所以改为用户自己授权
# self._allow_firewall_access()
self.server_thread = threading.Thread(target=self.server.serve_forever, daemon=True)
self.server_thread.start()
local_ips = self.get_all_local_ips()
main_ip = local_ips[0] if local_ips else '127.0.0.1'
local_url = "http://localhost:{0}".format(self.port)
network_urls = ["http://{0}:{1}".format(ip, self.port) for ip in local_ips]
server_instance = self
self.update_status("运行中 (端口 {0})".format(self.port), True)
return True, {
'local_url': local_url,
'network_urls': network_urls,
'main_ip': main_ip,
'all_ips': local_ips,
'port': self.port,
'folder': self.shared_path,
'log_queue': self.log_queue,
'has_password': bool(self.password),
'has_admin_password': bool(self.admin_password)
}
except Exception as e:
self.update_status("启动失败", False)
print(f"[DEBUG] 启动异常: {e}")
return False, "启动失败: {0}".format(str(e))
class ClickableLabel(QLabel):
clicked = pyqtSignal()
def __init__(self, text="", parent=None):
super().__init__(text, parent)
def mousePressEvent(self, event):
self.clicked.emit()
super().mousePressEvent(event)
class FileShareGUI(QMainWindow):
status_updated = pyqtSignal(str, bool)
start_completed = pyqtSignal(bool, object)
def __init__(self, initial_path=None, initial_port=None, auto_start=True):
super().__init__()
self.setWindowTitle("文件共享工具 v5.5")
self.set_window_icon()
self.settings = QSettings("FileShareTool", "App")
self.sharer = FileSharer()
self.start_result = None
self.initial_path = initial_path
self.initial_port = initial_port
self.auto_start = auto_start
self.reg_key_name = "FileShareTool"
self.startup_reg_key_name = "FileShareToolStartup"
self.log_queue = None
self.log_timer = QTimer(self)
self.setAcceptDrops(True)
self.qr_items = []
self.selected_qr_index = None
self.setup_tray_icon()
self.create_widgets()
self.connect_signals()
self.load_settings()
self.update_menu_btn_text()
self.update_startup_btn_text()
self.update_status("已停止", False)
if self.auto_start:
QTimer.singleShot(100, self.auto_start_share)
def set_window_icon(self):
try:
if ICON_BASE64:
icon_data = base64.b64decode(ICON_BASE64)
pixmap = QPixmap()
pixmap.loadFromData(icon_data)
self.setWindowIcon(QIcon(pixmap))
except Exception as e:
pass
def setup_tray_icon(self):
self.tray_icon = QSystemTrayIcon(self)
try:
if ICON_BASE64:
icon_data = base64.b64decode(ICON_BASE64)
pixmap = QPixmap()
pixmap.loadFromData(icon_data)
self.tray_icon.setIcon(QIcon(pixmap))
else:
self.tray_icon.setIcon(self.style().standardIcon(self.style().SP_ComputerIcon))
except:
self.tray_icon.setIcon(self.style().standardIcon(self.style().SP_ComputerIcon))
tray_menu = QMenu()
show_action = QAction("显示主窗口", self)
show_action.triggered.connect(self.show_window)
tray_menu.addAction(show_action)
toggle_action = QAction("启动/停止共享", self)
toggle_action.triggered.connect(self.toggle_share)
tray_menu.addAction(toggle_action)
tray_menu.addSeparator()
quit_action = QAction("退出程序", self)
quit_action.triggered.connect(self.quit_application)
tray_menu.addAction(quit_action)
self.tray_icon.setContextMenu(tray_menu)
self.tray_icon.activated.connect(self.tray_icon_activated)
self.tray_icon.setToolTip("文件共享工具")
self.tray_icon.show()
def show_window(self):
self.showNormal()
self.activateWindow()
self.raise_()
def tray_icon_activated(self, reason):
if reason == QSystemTrayIcon.DoubleClick:
self.show_window()
def quit_application(self):
self.save_settings()
if self.sharer.is_running:
self.sharer.stop()
time.sleep(0.1)
self.tray_icon.hide()
QApplication.quit()
def changeEvent(self, event):
if event.type() == event.WindowStateChange:
if self.isMinimized():
QTimer.singleShot(0, self.hide)
self.tray_icon.showMessage("文件共享工具", "程序已最小化到系统托盘", QSystemTrayIcon.Information, 2000)
super().changeEvent(event)
def toggle_extra_settings(self):
is_visible = self.expand_btn.isChecked()
self.extra_settings_widget.setVisible(is_visible)
if is_visible:
self.expand_btn.setText("收起设置↑")
else:
self.expand_btn.setText("更多设置...")
def create_widgets(self):
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QVBoxLayout(central_widget)
# --- 路径选择区域 ---
path_layout = QHBoxLayout()
path_label = QLabel("共享路径:")
self.path_entry = QLineEdit()
self.select_path_btn = QPushButton("浏览")
self.select_path_btn.setFixedWidth(80)
self.select_path_btn.clicked.connect(self.browse_path)
path_layout.addWidget(path_label)
path_layout.addWidget(self.path_entry)
path_layout.addWidget(self.select_path_btn)
main_layout.addLayout(path_layout)
# --- 主控制区 ---
main_controls_layout = QHBoxLayout()
self.toggle_share_btn = QPushButton("启动共享")
main_controls_layout.addWidget(self.toggle_share_btn)
main_controls_layout.addSpacing(10)
port_label = QLabel("端口:")
self.port_entry = QLineEdit()
self.port_entry.setFixedWidth(70)
main_controls_layout.addWidget(port_label)
main_controls_layout.addWidget(self.port_entry)
main_controls_layout.addSpacing(10)
pwd_label = QLabel("访问密码:")
self.password_entry = QLineEdit()
self.password_entry.setPlaceholderText("留空无保护")
self.password_entry.setFixedWidth(110)
main_controls_layout.addWidget(pwd_label)
main_controls_layout.addWidget(self.password_entry)
main_controls_layout.addSpacing(10)
self.browser_btn = QPushButton("打开浏览器")
main_controls_layout.addWidget(self.browser_btn)
main_controls_layout.addStretch(1)
self.expand_btn = QPushButton("更多设置...")
self.expand_btn.setCheckable(True)
self.expand_btn.setFixedWidth(120)
self.expand_btn.clicked.connect(self.toggle_extra_settings)
main_controls_layout.addWidget(self.expand_btn)
main_layout.addLayout(main_controls_layout)
# --- 更多设置区域 ---
self.extra_settings_widget = QWidget()
self.extra_settings_widget.setObjectName("ExtraWidget")
self.extra_settings_widget.setStyleSheet("""
#ExtraWidget {
background-color: #f5f5f5;
border: 1px solid #ddd;
border-radius: 5px;
}
""")
extra_layout = QHBoxLayout(self.extra_settings_widget)
extra_layout.setContentsMargins(10, 8, 10, 8)
admin_pwd_label = QLabel("管理密码:")
self.admin_password_entry = QLineEdit()
self.admin_password_entry.setPlaceholderText("留空无保护")
self.admin_password_entry.setFixedWidth(110)
self.admin_password_entry.setToolTip("设置后,上传/删除/重命名需二次验证")
self.startup_btn = QPushButton("开机自启")
self.menu_btn = QPushButton("添加右键菜单")
extra_layout.addWidget(admin_pwd_label)
extra_layout.addWidget(self.admin_password_entry)
extra_layout.addStretch(1)
extra_layout.addWidget(self.startup_btn)
extra_layout.addWidget(self.menu_btn)
self.extra_settings_widget.setVisible(False)
main_layout.addWidget(self.extra_settings_widget)
# --- 信息与日志区域 (使用 QSplitter 实现可拖拽调整高度) ---
splitter = QSplitter(Qt.Vertical)
# 1. 上半部分:访问信息
info_widget = QWidget()
info_layout = QVBoxLayout(info_widget)
info_layout.setContentsMargins(0, 0, 0, 0)
info_label = QLabel("访问信息:")
self.info_text = QTextEdit()
self.info_text.setReadOnly(True)
# 去掉setFixedHeight,改由 Splitter 控制高度
info_layout.addWidget(info_label)
info_layout.addWidget(self.info_text)
# 2. 下半部分:访问记录
log_widget = QWidget()
log_layout = QVBoxLayout(log_widget)
log_layout.setContentsMargins(0, 0, 0, 0) # 减小内部边距
log_label = QLabel("访问记录:")
self.log_text = QTextEdit()
self.log_text.setReadOnly(True)
log_layout.addWidget(log_label)
log_layout.addWidget(self.log_text)
# 将上下部分添加到分割器
splitter.addWidget(info_widget)
splitter.addWidget(log_widget)
# 设置初始比例:访问信息默认给 300px,剩下的全部给日志
splitter.setSizes([300, 600])
splitter.setCollapsible(0, False) # 防止被拖拽到消失
splitter.setCollapsible(1, False)
main_layout.addWidget(splitter)
# --- 二维码区域 ---
self.qr_container = QWidget()
self.qr_layout = QHBoxLayout(self.qr_container)
self.qr_layout.setSpacing(20)
self.qr_layout.addStretch(1)
self.qr_layout.addStretch(1)
self.qr_container.hide()
main_layout.addWidget(self.qr_container)
def connect_signals(self):
self.sharer.set_status_callback(self.emit_status_update)
self.toggle_share_btn.clicked.connect(self.toggle_share)
self.browser_btn.clicked.connect(self.open_browser)
self.menu_btn.clicked.connect(self.toggle_right_click_menu)
self.startup_btn.clicked.connect(self.toggle_startup)
self.status_updated.connect(self.update_status)
self.start_completed.connect(self.update_ui_after_start)
self.log_timer.timeout.connect(self.update_log_display)
def load_settings(self):
self.resize(self.settings.value("size", QSize(750, 580)))
self.move(self.settings.value("pos", QPoint(200, 200)))
saved_path = self.settings.value("path", DEFAULT_SHARE_PATH)
self.path_entry.setText(self.initial_path or saved_path)
self.port_entry.setText(str(self.initial_port or self.settings.value("port", DEFAULT_PORT)))
self.password_entry.setText(self.settings.value("password", ""))
self.admin_password_entry.setText(self.settings.value("admin_password", ""))
def save_settings(self):
self.settings.setValue("size", self.size())
self.settings.setValue("pos", self.pos())
self.settings.setValue("path", self.path_entry.text())
self.settings.setValue("port", self.port_entry.text())
self.settings.setValue("password", self.password_entry.text())
self.settings.setValue("admin_password", self.admin_password_entry.text())
def closeEvent(self, event):
self.save_settings()
if self.sharer.is_running:
reply = QMessageBox.question(self, '确认', '共享服务仍在运行,确定要退出吗?',
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.Yes:
self.sharer.stop()
time.sleep(0.1)
self.tray_icon.hide()
event.accept()
QApplication.quit()
else:
event.ignore()
else:
self.tray_icon.hide()
event.accept()
QApplication.quit()
def dragEnterEvent(self, event):
if event.mimeData().hasUrls():
event.acceptProposedAction()
else:
event.ignore()
def dropEvent(self, event):
if event.mimeData().hasUrls():
url = event.mimeData().urls()[0]
path = url.toLocalFile()
if Path(path).exists():
self.path_entry.setText(path)
def browse_path(self):
path = QFileDialog.getExistingDirectory(self, "选择共享文件夹", self.path_entry.text())
if path:
self.path_entry.setText(path)
def toggle_share(self):
if self.sharer.is_running:
self.stop_share()
else:
self.start_share()
def start_share(self):
path = self.path_entry.text().strip()
if not path or not Path(path).exists():
QMessageBox.warning(self, "警告", "请选择一个有效的共享路径!")
return
try:
port = int(self.port_entry.text().strip())
if not (1 <= port <= 65535):
raise ValueError
except ValueError:
QMessageBox.warning(self, "警告", "请输入有效的端口号 (1-65535)!")
self.port_entry.setText(str(DEFAULT_PORT))
return
password = self.password_entry.text().strip()
admin_password = self.admin_password_entry.text().strip()
self.info_text.clear()
self.log_text.clear()
self.toggle_share_btn.setEnabled(False)
threading.Thread(target=self._start_share_thread, args=(path, port, password, admin_password),
daemon=True).start()
def _start_share_thread(self, path, port, password, admin_password):
success, result = self.sharer.start(path, port, password, admin_password)
self.start_result = result
self.start_completed.emit(success, result)
def stop_share(self):
self.sharer.stop()
def open_browser(self):
import webbrowser
url_to_open = ""
if self.start_result and 'local_url' in self.start_result and self.sharer.is_running:
url_to_open = self.start_result['local_url']
if 'network_urls' in self.start_result and self.start_result['network_urls']:
network_url = self.start_result['network_urls'][0]
QApplication.clipboard().setText(network_url)
self.statusBar().showMessage(f"已复制局域网地址: {network_url}", 4000)
else:
try:
port = int(self.port_entry.text().strip())
url_to_open = f"http://localhost:{port}"
except:
url_to_open = f"http://localhost:{DEFAULT_PORT}"
if url_to_open:
webbrowser.open(url_to_open)
def emit_status_update(self, message, is_running):
self.status_updated.emit(message, is_running)
def update_status(self, message, is_running):
self.statusBar().showMessage(message)
is_transitioning = "正在" in message
# 1. 切换按钮的颜色和文字
if is_running:
self.toggle_share_btn.setText("停止共享")
self.toggle_share_btn.setStyleSheet(
"QPushButton { background-color: #f44336; color: white; font-weight: bold; padding: 5px 15px; border: none; border-radius: 4px; } QPushButton:hover { background-color: #d32f2f; }")
else:
self.toggle_share_btn.setText("启动共享")
self.toggle_share_btn.setStyleSheet(
"QPushButton { background-color: #4CAF50; color: white; font-weight: bold; padding: 5px 15px; border: none; border-radius: 4px; } QPushButton:hover { background-color: #45a049; }")
# 2. 控制按钮的启用/禁用
self.toggle_share_btn.setEnabled(not is_transitioning)
self.browser_btn.setEnabled(is_running and not is_transitioning)
# --- 控制输入框变灰和只读 ---
disable_edit = is_running or is_transitioning
# 定义样式:灰色背景 + 深灰文字 vs 默认白色背景
# 这里使用了 #e0e0e0 (中灰) 让不可编辑状态更明显
input_style = "QLineEdit { background-color: #e0e0e0; color: #555555; }" if disable_edit else ""
# 批量设置所有输入框
inputs = [self.path_entry, self.port_entry, self.password_entry, self.admin_password_entry]
for widget in inputs:
widget.setReadOnly(disable_edit)
widget.setStyleSheet(input_style)
# 顺便把“浏览”按钮也禁用了,避免误解
if hasattr(self, 'select_path_btn'):
self.select_path_btn.setEnabled(not disable_edit)
# --- 托盘提示 ---
if is_running:
self.toggle_share_btn.setFocus()
self.tray_icon.setToolTip("文件共享工具 - 运行中")
else:
self.tray_icon.setToolTip("文件共享工具 - 已停止")
# --- 清理逻辑 ---
if not is_running and not is_transitioning:
self.info_text.clear()
self.log_text.clear()
self.log_timer.stop()
self.log_queue = None
self.clear_qrcodes()
self.qr_container.hide()
def update_log_display(self):
if not self.log_queue:
return
while not self.log_queue.empty():
try:
timestamp, ip, request = self.log_queue.get_nowait()
log_entry = f"[{timestamp}] {ip} - {request}"
self.log_text.append(log_entry)
except queue.Empty:
break
def clear_qrcodes(self):
while self.qr_layout.count() > 2:
item = self.qr_layout.takeAt(1)
if item and item.widget():
item.widget().deleteLater()
self.qr_items = []
self.selected_qr_index = None
def select_qr(self, index):
self.selected_qr_index = index
for item in self.qr_items:
if item['index'] == index:
item['widget'].show()
item['qr_label'].setStyleSheet("border: 3px solid #4CAF50; background: white; border-radius: 5px;")
item['text_label'].setStyleSheet(
"font-weight: bold; color: #4CAF50; padding: 5px; background: #e8f5e9; border-radius: 3px;")
else:
item['widget'].hide()
def toggle_qr_selection(self, index):
if self.selected_qr_index == index:
self.cancel_qr_selection()
else:
self.select_qr(index)
def cancel_qr_selection(self):
self.selected_qr_index = None
for item in self.qr_items:
item['widget'].show()
item['qr_label'].setStyleSheet("border: 2px solid #ccc; background: white; border-radius: 5px;")
item['text_label'].setStyleSheet("font-weight: bold; color: #333; padding: 5px; border-radius: 3px;")
def show_qrcodes(self, urls):
try:
self.clear_qrcodes()
if not urls:
self.qr_container.hide()
return
self.qr_items = []
self.selected_qr_index = None
for index, (label_text, url) in enumerate(urls):
qr_item_widget = QWidget()
qr_item_widget.setProperty("qr_index", index)
qr_item_layout = QVBoxLayout(qr_item_widget)
qr_item_layout.setContentsMargins(5, 5, 5, 5)
qr_item_layout.setSpacing(5)
qr = segno.make(url, error='L')
buffer = io.BytesIO()
qr.save(buffer, kind='png', scale=4, border=1)
qimg_data = buffer.getvalue()
pix = QPixmap()
pix.loadFromData(qimg_data)
if not pix.isNull():
qr_label = ClickableLabel()
qr_label.setFixedSize(150, 150)
qr_label.setScaledContents(True)
qr_label.setStyleSheet("border: 2px solid #ccc; background: white; border-radius: 5px;")
qr_label.setPixmap(pix)
qr_label.setCursor(Qt.PointingHandCursor)
qr_label.clicked.connect(lambda idx=index: self.toggle_qr_selection(idx))
text_label = ClickableLabel(label_text)
text_label.setAlignment(Qt.AlignCenter)
text_label.setStyleSheet("font-weight: bold; color: #333; padding: 5px; border-radius: 3px;")
text_label.setCursor(Qt.PointingHandCursor)
text_label.clicked.connect(lambda idx=index: self.toggle_qr_selection(idx))
qr_item_layout.addWidget(qr_label, 0, Qt.AlignCenter)
qr_item_layout.addWidget(text_label, 0, Qt.AlignCenter)
qr_item_widget.setStyleSheet("QWidget { background: transparent; border-radius: 8px; }")
self.qr_items.append({
'widget': qr_item_widget,
'qr_label': qr_label,
'text_label': text_label,
'index': index
})
insert_pos = self.qr_layout.count() - 1
self.qr_layout.insertWidget(insert_pos, qr_item_widget)
self.qr_container.show()
except Exception as e:
print(f"二维码生成崩溃: {e}")
def update_ui_after_start(self, success, result):
if success:
info = f"\U0001F4C1 共享路径:{result['folder']}\n"
info += f"\U0001F3F7 本机端口:{result['port']}\n"
if result.get('has_password'):
info += f"\U0001F512 访问:需密码\n"
else:
info += f"\U0001F513 访问:公开\n"
if result.get('has_admin_password'):
info += f"\U0001F6E1 管理:需密码 (上传/删除/重命名)\n"
else:
info += f"\U0001F4DD 管理:公开 (无保护)\n"
info += f"\U0001F4BB 本机访问:{result['local_url']}\n"
info += "\U0001F310 局域网址:"
if result['network_urls']:
info += result['network_urls'][0]
for url in result['network_urls'][1:]:
info += f"\n{url}"
else:
info += "(未找到)"
info += "\n"
self.info_text.setText(info)
self.log_queue = result.get('log_queue')
if self.log_queue:
self.log_timer.start(1000)
qr_urls = []
if result['network_urls']:
for url in result['network_urls']:
ip_port = url.replace('http://', '').replace('https://', '')
qr_urls.append((ip_port, url))
else:
ip_port = result['local_url'].replace('http://', '').replace('https://', '')
qr_urls.append((ip_port, result['local_url']))
self.show_qrcodes(qr_urls)
else:
self.info_text.setText(f"错误: {result}")
QMessageBox.critical(self, "错误", f"启动失败: {result}")
self.qr_container.hide()
def auto_start_share(self):
path = self.initial_path if self.initial_path and Path(self.initial_path).exists() else self.path_entry.text()
port = self.initial_port if self.initial_port and 1 <= self.initial_port <= 65535 else None
if path:
self.path_entry.setText(path)
if port:
self.port_entry.setText(str(port))
if self.auto_start and Path(path).exists():
self.start_share()
def check_menu_exists(self):
if platform.system() != 'Windows':
return False
try:
key = winreg.OpenKey(winreg.HKEY_CLASSES_ROOT, f"Directory\\shell\\{self.reg_key_name}")
winreg.CloseKey(key)
return True
except:
return False
def check_startup_exists(self):
if platform.system() != 'Windows':
return False
try:
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Microsoft\Windows\CurrentVersion\Run", 0,
winreg.KEY_READ)
try:
winreg.QueryValueEx(key, self.startup_reg_key_name)
winreg.CloseKey(key)
return True
except:
winreg.CloseKey(key)
return False
except:
return False
def update_menu_btn_text(self):
if not self.menu_btn: return
if platform.system() != 'Windows':
self.menu_btn.setText("右键菜单(仅Win)")
self.menu_btn.setEnabled(False)
return
self.menu_btn.setText("移除右键" if self.check_menu_exists() else "添加右键")
def update_startup_btn_text(self):
if not self.startup_btn: return
if platform.system() != 'Windows':
self.startup_btn.setText("开机自启(仅Win)")
self.startup_btn.setEnabled(False)
return
self.startup_btn.setText("取消自启" if self.check_startup_exists() else "开机自启")
def toggle_right_click_menu(self):
if platform.system() != 'Windows': return
try:
if self.check_menu_exists():
self.remove_from_right_click_menu()
else:
self.add_to_right_click_menu()
self.update_menu_btn_text()
except Exception as e:
QMessageBox.critical(self, "错误", f"操作失败:{str(e)}")
def toggle_startup(self):
if platform.system() != 'Windows': return
try:
if self.check_startup_exists():
self.remove_startup()
else:
self.add_startup()
self.update_startup_btn_text()
except Exception as e:
QMessageBox.critical(self, "错误", f"操作失败:{str(e)}")
def add_startup(self):
if IS_FROZEN:
exe_path = os.path.abspath(sys.argv[0])
command_str = f'"{exe_path}"'
else:
python_exe_path = sys.executable
script_path = os.path.abspath(__file__)
command_str = f'"{python_exe_path}" "{script_path}"'
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Microsoft\Windows\CurrentVersion\Run", 0,
winreg.KEY_SET_VALUE)
winreg.SetValueEx(key, self.startup_reg_key_name, 0, winreg.REG_SZ, command_str)
winreg.CloseKey(key)
QMessageBox.information(self, "成功", "已开启开机自启。")
def remove_startup(self):
try:
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Microsoft\Windows\CurrentVersion\Run", 0,
winreg.KEY_SET_VALUE)
winreg.DeleteValue(key, self.startup_reg_key_name)
winreg.CloseKey(key)
except:
pass
QMessageBox.information(self, "成功", "已取消开机自启。")
def add_to_right_click_menu(self):
menu_name = "共享此文件夹"
if IS_FROZEN:
exe_path = os.path.abspath(sys.argv[0])
icon_path = f'"{exe_path}",0'
command_str_on_folder = f'"{exe_path}" "%1"'
command_str_in_folder = f'"{exe_path}" "%V"'
else:
python_exe_path = sys.executable
script_path = os.path.abspath(__file__)
icon_path = f'"{python_exe_path}",0'
command_str_on_folder = f'"{python_exe_path}" "{script_path}" "%1"'
command_str_in_folder = f'"{python_exe_path}" "{script_path}" "%V"'
key1 = winreg.CreateKey(winreg.HKEY_CLASSES_ROOT, f"Directory\\shell\\{self.reg_key_name}")
winreg.SetValueEx(key1, "", 0, winreg.REG_SZ, menu_name)
winreg.SetValueEx(key1, "Icon", 0, winreg.REG_SZ, icon_path)
winreg.CloseKey(key1)
key2 = winreg.CreateKey(winreg.HKEY_CLASSES_ROOT, f"Directory\\shell\\{self.reg_key_name}\\command")
winreg.SetValueEx(key2, "", 0, winreg.REG_SZ, command_str_on_folder)
winreg.CloseKey(key2)
key3 = winreg.CreateKey(winreg.HKEY_CLASSES_ROOT, f"Directory\\Background\\shell\\{self.reg_key_name}")
winreg.SetValueEx(key3, "", 0, winreg.REG_SZ, menu_name)
winreg.SetValueEx(key3, "Icon", 0, winreg.REG_SZ, icon_path)
winreg.CloseKey(key3)
key4 = winreg.CreateKey(winreg.HKEY_CLASSES_ROOT, f"Directory\\Background\\shell\\{self.reg_key_name}\\command")
winreg.SetValueEx(key4, "", 0, winreg.REG_SZ, command_str_in_folder)
winreg.CloseKey(key4)
QMessageBox.information(self, "成功", "右键菜单添加成功。")
def remove_from_right_click_menu(self):
try:
winreg.DeleteKey(winreg.HKEY_CLASSES_ROOT, f"Directory\\shell\\{self.reg_key_name}\\command")
winreg.DeleteKey(winreg.HKEY_CLASSES_ROOT, f"Directory\\shell\\{self.reg_key_name}")
except:
pass
try:
winreg.DeleteKey(winreg.HKEY_CLASSES_ROOT, f"Directory\\Background\\shell\\{self.reg_key_name}\\command")
winreg.DeleteKey(winreg.HKEY_CLASSES_ROOT, f"Directory\\Background\\shell\\{self.reg_key_name}")
except:
pass
QMessageBox.information(self, "成功", "右键菜单已移除。")
def parse_arguments():
args = sys.argv[1:]
initial_port = None
initial_path = None
auto_start = False
i = 0
while i < len(args):
arg = args[i]
if arg.lower() in ['-p', '--port'] and i + 1 < len(args):
try:
initial_port = int(args[i + 1])
args.pop(i)
args.pop(i)
except ValueError:
args.pop(i)
args.pop(i)
else:
i += 1
if args:
path_candidate = args[0].strip().strip('\'"')
if path_candidate:
path_obj = Path(path_candidate)
if path_obj.exists():
initial_path = str(path_obj.resolve())
auto_start = True
return initial_path, initial_port, auto_start
def main():
initial_path, initial_port, auto_start = parse_arguments()
app = QApplication(sys.argv)
app.setQuitOnLastWindowClosed(False)
font_name = "Microsoft YaHei" if platform.system() == "Windows" else "Arial"
app.setFont(QFont(font_name, 10))
window = FileShareGUI(initial_path, initial_port, auto_start)
window.show()
sys.exit(app.exec_())
if __name__ == "__main__":
main()