[Python] 纯文本查看 复制代码
# -*- coding: utf-8 -*-
"""
Python依赖库下载与离线安装程序 - 精简核心代码
只包含最关键的功能实现,去掉了完整的UI设计
"""
import subprocess
import sys
import os
import threading
import re
import time
import urllib.request
# -------------------
# 隐藏CMD窗口的关键代码
# -------------------
if sys.platform == 'win32':
STARTUPINFO = subprocess.STARTUPINFO()
STARTUPINFO.dwFlags |= subprocess.STARTF_USESHOWWINDOW
STARTUPINFO.wShowWindow = 0 # SW_HIDE
# -------------------
# 核心功能:获取正确的Python解释器路径
# -------------------
def get_python_exe():
"""获取正确的Python解释器路径,避免PyInstaller打包后调用自身"""
python_exe = sys.executable
# 检查是否为PyInstaller打包环境
if getattr(sys, 'frozen', False):
# 尝试从注册表获取Python安装路径
try:
import winreg
python_versions = ['3.8', '3.9', '3.10', '3.11', '3.12']
for version in python_versions:
try:
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, f"SOFTWARE\Python\PythonCore\{version}\InstallPath") as key:
python_path = winreg.QueryValueEx(key, "ExecutablePath")[0]
if os.path.exists(python_path):
return python_path
except Exception:
continue
except Exception:
pass
# 尝试使用环境变量
env_python = os.environ.get('PYTHONHOME')
if env_python and os.path.exists(os.path.join(env_python, 'python.exe')):
return os.path.join(env_python, 'python.exe')
# 回退到系统默认python
return 'python'
return python_exe
# -------------------
# 核心功能:依赖分析
# -------------------
def analyze_from_imports(project_path):
"""从Python文件的import语句分析依赖"""
imports = set()
# 获取Python标准库模块列表
def get_stdlib_modules():
# 简化版本,只包含常用标准库
return {
'abc', 'argparse', 'array', 'ast', 'asyncio', 'base64', 'bisect', 'builtins',
'collections', 'datetime', 'decimal', 'difflib', 'enum', 'functools', 'gc',
'glob', 'hashlib', 'heapq', 'http', 'importlib', 'inspect', 'io', 'itertools',
'json', 'logging', 'math', 'multiprocessing', 'os', 'pathlib', 're', 'shutil',
'socket', 'sqlite3', 'ssl', 'string', 'subprocess', 'sys', 'threading',
'time', 'tkinter', 'urllib', 'uuid', 'warnings', 'xml', 'zipfile'
}
stdlib_modules = get_stdlib_modules()
for root, dirs, files in os.walk(project_path):
# 跳过虚拟环境和版本控制目录
dirs[:] = [d for d in dirs if d not in ['venv', 'env', '.venv', '__pycache__', '.git', 'node_modules']]
for file in files:
if file.endswith('.py'):
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# 匹配import语句
import_matches = re.findall(r'^import\s+([a-zA-Z0-9_]+)', content, re.MULTILINE)
from_matches = re.findall(r'^from\s+([a-zA-Z0-9_]+)', content, re.MULTILINE)
imports.update(import_matches)
imports.update(from_matches)
except Exception:
continue
# 过滤标准库,映射模块名到包名
def module_to_package(module_name):
mapping = {
'cv2': 'opencv-python', 'PIL': 'Pillow', 'sklearn': 'scikit-learn',
'yaml': 'PyYAML', 'bs4': 'beautifulsoup4', 'dotenv': 'python-dotenv'
}
return mapping.get(module_name, module_name)
third_party = []
for imp in imports:
if imp.lower() not in [m.lower() for m in stdlib_modules]:
pkg_name = module_to_package(imp)
if pkg_name:
third_party.append((pkg_name, ""))
return list(set(third_party))
# -------------------
# 核心功能:镜像源管理与速度测试
# -------------------
class MirrorManager:
def __init__(self):
self.mirror_sources = {
"官方源": "https://pypi.org/simple",
"阿里云": "https://mirrors.aliyun.com/pypi/simple/",
"腾讯云": "https://mirrors.cloud.tencent.com/pypi/simple/",
"华为云": "https://mirrors.huaweicloud.com/repository/pypi/simple/",
"清华大学": "https://pypi.tuna.tsinghua.edu.cn/simple/",
"北京大学": "https://mirrors.pku.edu.cn/pypi/simple/",
"豆瓣": "https://pypi.doubanio.com/simple/",
"中科大": "https://pypi.mirrors.ustc.edu.cn/simple/"
}
self.selected_mirror = "官方源"
def test_mirror_speed(self, mirror_name, mirror_url):
"""测试单个镜像源的响应速度"""
try:
start_time = time.time()
# 测试URL,选择pip包的简单路径
test_url = f"{mirror_url.rstrip('/')}/pip/"
with urllib.request.urlopen(test_url, timeout=5) as response:
response.read(1024) # 只读取少量数据
end_time = time.time()
return mirror_name, end_time - start_time
except Exception:
return mirror_name, float('inf')
def select_fastest_mirror(self):
"""测试所有镜像源并选择最快的"""
results = []
for mirror_name, mirror_url in self.mirror_sources.items():
name, speed = self.test_mirror_speed(mirror_name, mirror_url)
if speed != float('inf'):
results.append((name, speed))
if results:
results.sort(key=lambda x: x[1])
fastest_mirror = results[0][0]
self.selected_mirror = fastest_mirror
return fastest_mirror, results[0][1]
return None, None
# -------------------
# 核心功能:依赖下载与安装
# -------------------
def download_packages(packages, download_path, mirror_manager):
"""下载包"""
os.makedirs(download_path, exist_ok=True)
python_exe = get_python_exe()
for name, version in packages:
pkg_spec = f"{name}{version}" if version else name
print(f"正在下载: {pkg_spec}")
try:
cmd = [
python_exe, "-m", "pip", "download",
pkg_spec,
"-d", download_path,
"--no-cache-dir"
]
# 添加镜像源参数
mirror_url = mirror_manager.mirror_sources[mirror_manager.selected_mirror]
cmd.extend(["--index-url", mirror_url])
result = subprocess.run(
cmd, capture_output=True, text=True, encoding='utf-8',
startupinfo=STARTUPINFO
)
if result.returncode == 0:
print(f"✓ {pkg_spec} 下载成功")
else:
print(f"✗ {pkg_spec} 下载失败: {result.stderr}")
except Exception as e:
print(f"✗ {pkg_spec} 下载出错: {e}")
def install_online_packages(packages, mirror_manager):
"""在线安装包"""
python_exe = get_python_exe()
# 获取已安装包列表
def get_installed_packages():
try:
result = subprocess.run(
[python_exe, "-m", "pip", "list", "--format=freeze"],
capture_output=True, text=True, encoding='utf-8',
startupinfo=STARTUPINFO
)
installed = {}
for line in result.stdout.strip().split('\n'):
if '==' in line:
name, version = line.split('==')
installed[name.lower()] = version
return installed
except Exception:
return {}
installed = get_installed_packages()
for name, version in packages:
pkg_spec = f"{name}{version}" if version else name
# 检查是否已安装
if name.lower() in installed:
installed_version = installed[name.lower()]
if version and version.startswith('=='):
required_version = version[2:]
if installed_version == required_version:
print(f"⊘ {name}=={installed_version} 已安装相同版本,跳过")
continue
elif not version:
print(f"⊘ {name} 已安装({installed_version}),跳过")
continue
print(f"正在安装: {pkg_spec}")
try:
cmd = [python_exe, "-m", "pip", "install", pkg_spec]
# 添加镜像源参数
mirror_url = mirror_manager.mirror_sources[mirror_manager.selected_mirror]
cmd.extend(["--index-url", mirror_url])
result = subprocess.run(
cmd, capture_output=True, text=True, encoding='utf-8',
startupinfo=STARTUPINFO
)
if result.returncode == 0:
print(f"✓ {pkg_spec} 安装成功")
else:
print(f"✗ {pkg_spec} 安装失败: {result.stderr}")
except Exception as e:
print(f"✗ {pkg_spec} 安装出错: {e}")
# -------------------
# 使用示例
# -------------------
if __name__ == "__main__":
# 1. 分析项目依赖
project_path = "./"
dependencies = analyze_from_imports(project_path)
print(f"分析到的依赖: {dependencies}")
# 2. 选择最快镜像源
mirror_manager = MirrorManager()
fastest_mirror, speed = mirror_manager.select_fastest_mirror()
if fastest_mirror:
print(f"最快的镜像源是: {fastest_mirror} (响应时间: {speed:.3f}秒)")
# 3. 下载依赖
# download_path = "./offline_packages"
# download_packages(dependencies, download_path, mirror_manager)
# 4. 安装依赖
# install_online_packages(dependencies, mirror_manager)