[Python] 纯文本查看 复制代码
import sys
import os
import requests
import json
import logging
import time
from datetime import datetime
from PyQt5.QtWidgets import (
QApplication, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit,
QPushButton, QListWidget, QMessageBox, QInputDialog, QCheckBox,
QProgressBar, QFileDialog, QGroupBox, QGridLayout, QComboBox
)
from PyQt5.QtCore import QThread, pyqtSignal, Qt, QSettings
from bs4 import BeautifulSoup
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
import re
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("chaoxing_downloader.log", encoding='utf-8'),
logging.StreamHandler()
]
)
class ChaoXingWorkDownloader:
def __init__(self):
self.session = requests.Session()
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0 Safari/537.36"
}
self.class_list = []
self.current_class_name = ""
self.work_list = []
self.download_folder = os.path.join(os.path.expanduser("~"), "Downloads", "ChaoXing")
# 确保下载目录存在
os.makedirs(self.download_folder, exist_ok=True)
# 在login方法中添加更详细的URL日志
def login(self, user, password):
logging.info("正在尝试登录...")
login_url = "https://passport2.chaoxing.com/api/login"
logging.info(f"登录URL: {login_url}")
data = {
"name": user,
"pwd": password,
"loginType": "1",
"verify": "0",
"schoolid": ""
}
try:
res = self.session.post(login_url, data=data, headers=self.headers, timeout=15)
json_data = res.json()
if json_data.get("result") is True:
logging.info("登录成功")
return True, ""
else:
error_msg = json_data.get("msg", "未知错误")
logging.error(f"登录失败: {error_msg}")
return False, error_msg
except requests.exceptions.Timeout:
logging.error("登录超时,请检查网络连接")
return False, "登录超时,请检查网络连接"
except requests.exceptions.ConnectionError:
logging.error("网络连接错误")
return False, "网络连接错误"
except Exception as e:
logging.error(f"登录异常: {str(e)}")
return False, f"登录异常: {str(e)}"
# 在get_courses方法中添加URL日志
def get_courses(self):
logging.info("获取课程列表中...")
course_url = "https://mooc2-ans.chaoxing.com/visit/courses/list?v=1652629452722&rss=1&start=0&size=500&catalogId=0&searchname="
logging.info(f"课程列表URL: {course_url}")
try:
res = self.session.get(course_url, headers=self.headers, timeout=15)
res.raise_for_status()
except requests.exceptions.Timeout:
logging.error("获取课程列表超时")
return []
except requests.exceptions.ConnectionError:
logging.error("网络连接错误")
return []
except Exception as e:
logging.error(f"请求课程列表失败: {str(e)}")
return []
soup = BeautifulSoup(res.text, 'html.parser')
items = soup.select('li.course')
if not items:
logging.warning("无法找到课程列表,请确认已登录")
return []
self.class_list = []
for idx, item in enumerate(items, start=1):
try:
name_elem = item.select_one('.course-name')
name = name_elem.text.strip() if name_elem else "未知课程"
link_elem = item.select_one('a[href]')
link = link_elem['href'] if link_elem else ""
if not link.startswith("http"):
link = "https://mooc1.chaoxing.com" + link
# 获取课程图片
img_elem = item.select_one('img[src]')
img_url = img_elem['src'] if img_elem else ""
self.class_list.append({
"index": idx,
"name": name,
"url": link,
"img_url": img_url
})
logging.debug(f"找到课程: {name}")
except Exception as e:
logging.error(f"解析课程项失败: {str(e)}")
continue
logging.info(f"共获取到 {len(self.class_list)} 个课程")
return self.class_list
def select_course(self, index):
if 0 <= index < len(self.class_list):
self.current_class_name = self.class_list[index]["name"]
logging.info(f"已选择课程: {self.current_class_name}")
return self.class_list[index]
else:
logging.warning("输入无效,请重新选择")
return None
def get_course_detail_url(self, course_url):
"""获取跳转后的 URL 并修改 pageHeader 参数"""
try:
res = self.session.get(course_url, headers=self.headers, allow_redirects=False, timeout=15)
if res.status_code == 302:
redirect_url = res.headers["Location"]
logging.info(f"跳转到: {redirect_url}")
return self.modify_pageheader(redirect_url)
else:
logging.info("未发生跳转")
return course_url
except requests.exceptions.Timeout:
logging.error("获取跳转 URL 超时")
return course_url
except requests.exceptions.ConnectionError:
logging.error("网络连接错误")
return course_url
except Exception as e:
logging.error(f"获取跳转 URL 失败: {str(e)}")
return course_url
def modify_pageheader(self, url):
"""修改 URL 中的 pageHeader 参数"""
parsed = urlparse(url)
query_params = parse_qs(parsed.query)
query_params["pageHeader"] = ["8"] # 修改为 8
new_query = urlencode(query_params, doseq=True)
new_url = parsed._replace(query=new_query)
return urlunparse(new_url)
def extract_course_params(self, html):
"""从课程详情页提取参数"""
soup = BeautifulSoup(html, 'html.parser')
params = {}
# 提取隐藏字段中的参数
inputs = soup.select('input[type="hidden"]')
for input_tag in inputs:
name = input_tag.get('id', '').lower()
value = input_tag.get('value', '')
if value:
if name == 'courseid':
params['courseId'] = value
elif name == 'clazzid':
params['classId'] = value
elif name == 'cpi':
params['cpi'] = value
elif name == 'enc':
params['enc'] = value
elif name == 'workenc':
params['enc'] = value # workEnc 作为 enc 使用
return params
def construct_work_list_url(self, course_detail_html):
"""构造作业列表 URL"""
try:
params = self.extract_course_params(course_detail_html)
if not all(k in params for k in ['courseId', 'classId', 'cpi', 'enc']):
logging.error("无法提取完整参数,跳过构造 URL")
logging.debug(f"提取的参数: {params}")
return None
base_url = "https://mooc1.chaoxing.com/mooc-ans/mooc2/work/list"
query_params = {
"courseId": params['courseId'],
"classId": params['classId'],
"cpi": params['cpi'],
"enc": params['enc'],
"status": "0",
"pageNum": "1",
"topicId": "0"
}
url = f"{base_url}?{urlencode(query_params)}"
logging.info(f"构造的作业列表URL: {url}")
return url
except Exception as e:
logging.error(f"构造作业列表URL失败: {str(e)}")
return None
def parse_work_list(self, html):
"""解析作业列表页面"""
try:
soup = BeautifulSoup(html, 'html.parser')
items = soup.select('li[data]') # 根据实际页面结构调整选择器
self.work_list = []
for item in items:
try:
link = item.get('data', '')
title_elem = item.select_one('.overHidden2.fl')
status_elem = item.select_one('.status.fl')
if not title_elem or not status_elem:
continue
title = title_elem.text.strip()
status = status_elem.text.strip()
logging.info(f"找到作业: {title} - {status}")
logging.debug(f"作业链接: {link}")
self.work_list.append({
"title": title,
"status": status,
"detail_url": link
})
except Exception as e:
logging.error(f"解析作业项失败: {str(e)}")
continue
return self.work_list
except Exception as e:
logging.error(f"解析作业列表失败: {str(e)}")
return []
def get_works(self, course_url):
logging.info(f"正在进入课程: {self.current_class_name}")
try:
modified_url = self.get_course_detail_url(course_url)
logging.info(f"正在访问课程详情页: {modified_url}")
res = self.session.get(modified_url, headers=self.headers, timeout=15)
# 构造作业列表 URL
work_list_url = self.construct_work_list_url(res.text)
if not work_list_url:
logging.error("无法构造作业列表 URL")
return []
logging.info(f"正在访问作业列表页: {work_list_url}")
work_res = self.session.get(work_list_url, headers=self.headers, timeout=15)
html = work_res.text
# 1. 解析总页数
page_num = 1
page_num_match = re.search(r'pageNum\s*:\s*(\d+)', html)
if page_num_match:
page_num = int(page_num_match.group(1))
logging.info(f"共 {page_num} 页作业列表")
# 2. 遍历所有页,收集所有作业
all_works = []
for page in range(1, page_num + 1):
page_url = re.sub(r'pageNum=\d+', f'pageNum={page}', work_list_url)
logging.info(f"抓取第 {page} 页: {page_url}")
page_res = self.session.get(page_url, headers=self.headers, timeout=15)
self.parse_work_list(page_res.text)
all_works.extend(self.work_list)
# 防止请求过快
time.sleep(0.5)
self.work_list = all_works
logging.info(f"共获取到 {len(self.work_list)} 个作业")
return self.work_list
except requests.exceptions.Timeout:
logging.error("获取作业列表超时")
return []
except requests.exceptions.ConnectionError:
logging.error("网络连接错误")
return []
except Exception as e:
logging.error(f"获取作业列表失败: {str(e)}")
return []
def download_work_file(self, detail_url, save_folder=None, progress_callback=None):
if save_folder is None:
save_folder = self.download_folder
try:
logging.info(f"正在访问作业详情页: {detail_url}")
try:
res = self.session.get(detail_url, headers=self.headers, timeout=15)
res.raise_for_status() # 确保请求成功
except requests.exceptions.RequestException as e:
logging.error(f"访问作业详情页失败: {str(e)}")
return [], f"访问作业详情页失败: {str(e)}"
soup = BeautifulSoup(res.text, 'html.parser')
# 获取作业标题作为文件夹名
title_elem = soup.select_one('.mark_title')
work_title = title_elem.text.strip() if title_elem else "未知作业"
# 创建以作业名命名的子文件夹
work_folder = os.path.join(save_folder, self._sanitize_filename(work_title))
try:
os.makedirs(work_folder, exist_ok=True)
except OSError as e:
logging.error(f"创建作业文件夹失败: {str(e)}")
return [], f"创建作业文件夹失败: {str(e)}"
# 查找带有 data 属性的 span 元素 (支持多种文件类型)
file_spans = soup.select('span[data][type]')
# 如果没有找到可下载文件,尝试保存作业内容
if not file_spans:
logging.info("当前页面未找到可下载文件元素,尝试保存作业内容")
# 提取作业内容
content_elem = soup.select_one('.mark_content')
if content_elem:
content = content_elem.get_text(strip=True)
if content:
try: # 添加缺失的try块
# 保存作业内容到文本文件
content_file = os.path.join(work_folder, "作业内容.txt")
with open(content_file, "w", encoding="utf-8") as f:
f.write(f"作业标题: {work_title}\n\n")
f.write(f"作业内容:\n{content}\n")
# 尝试保存整个HTML页面以备查看
html_file = os.path.join(work_folder, "作业页面.html")
with open(html_file, "w", encoding="utf-8") as f:
f.write(res.text)
logging.info(f"已保存作业内容到: {content_file}")
return [content_file, html_file], None
except Exception as e: # 修正缩进
logging.error(f"保存作业内容失败: {str(e)}")
return [], f"保存作业内容失败: {str(e)}"
# 尝试查找作业列表中的链接
work_links = soup.select('li[data]')
if work_links:
logging.info(f"找到{len(work_links)}个作业链接")
downloaded_files = []
for work_link in work_links:
try:
# 获取作业详情页URL
work_url = work_link['data'].strip()
# 处理被反引号包围的URL
if work_url.startswith('`') and work_url.endswith('`'):
work_url = work_url.strip('`').strip()
elif '`' in work_url: # 处理可能的部分反引号
work_url = work_url.replace('`', '').strip()
logging.info(f"访问作业详情页: {work_url}")
# 访问作业详情页
work_res = self.session.get(work_url, headers=self.headers, timeout=15)
work_soup = BeautifulSoup(work_res.text, 'html.parser')
# 查找下载链接
download_links = []
# 1. 查找带有下载文本的链接
for a_tag in work_soup.find_all('a'):
if '下载' in a_tag.text and a_tag.has_attr('href'):
download_links.append(a_tag)
# 2. 查找带有附件标识的链接
attachment_links = work_soup.select('a.listSubmit')
download_links.extend(attachment_links)
if download_links:
for download_link in download_links:
try:
# 获取href属性并清理
href_value = download_link['href'].strip()
# 处理被反引号包围的URL
if href_value.startswith('`') and href_value.endswith('`'):
href_value = href_value.strip('`').strip()
logging.info(f"清理反引号后的链接: {href_value}")
elif '`' in href_value: # 处理可能的部分反引号
href_value = href_value.replace('`', '').strip()
logging.info(f"清理部分反引号后的链接: {href_value}")
# 处理相对URL
if href_value.startswith('/'):
href_value = f"https://mooc1.chaoxing.com{href_value}"
logging.info(f"处理相对路径后的链接: {href_value}")
elif not (
href_value.startswith('http://') or href_value.startswith('https://')):
href_value = f"https://mooc1.chaoxing.com/{href_value.lstrip('/')}"
logging.info(f"处理非标准URL后的链接: {href_value}")
# 处理非标准URL后的链接: {href_value}"
final_download_url = href_value
file_name = download_link.text.strip() or f"附件_{len(downloaded_files) + 1}"
# 设置完整的 headers
download_headers = {
"User-Agent": self.headers["User-Agent"],
"Referer": work_url,
}
# 开始下载文件
logging.info(f"开始下载文件: {file_name}")
file_path = os.path.join(work_folder, self._sanitize_filename(file_name))
# 下载文件
with self.session.get(final_download_url, stream=True, headers=download_headers,
timeout=30) as r:
r.raise_for_status()
with open(file_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
logging.info(f"文件已保存至: {file_path}")
downloaded_files.append(file_path)
except Exception as e:
logging.error(f"下载文件失败: {str(e)}")
except Exception as e:
logging.error(f"处理作业链接失败: {str(e)}")
if downloaded_files:
return downloaded_files, None
# 如果没有找到内容元素,返回错误
return [], "未找到可下载文件或作业内容,但已创建作业文件夹"
# 以下代码只有在找到file_spans时才会执行
downloaded_files = []
for file_span in file_spans:
data_id = file_span['data']
file_type = file_span.get('type')
file_name = file_span.get('name', f"{data_id}.{file_type}")
# 构造阅读页 URL
read_url = f"https://mooc1.chaoxing.com/mooc-ans/ueditorupload/read?objectId={data_id}"
logging.info(f"正在访问阅读页: {read_url}")
# 添加重试机制
max_retries = 3
retry_count = 0
download_link = None
while retry_count < max_retries and not download_link:
try:
# 请求阅读页(更新 session cookie)
read_res = self.session.get(read_url, headers=self.headers, timeout=15)
read_soup = BeautifulSoup(read_res.text, 'html.parser')
# 尝试多种方式查找下载按钮
# 1. 标准选择器
download_link = read_soup.select_one('a.btnDown[href]')
# 2. 如果没找到,尝试查找任何带有下载文本的链接
if not download_link:
for a_tag in read_soup.find_all('a'):
if '下载' in a_tag.text and a_tag.has_attr('href'):
download_link = a_tag
break
# 3. 如果还没找到,尝试查找任何带有ico_dow类的元素的父级a标签
if not download_link:
ico_elem = read_soup.select_one('.ico_dow')
if ico_elem and ico_elem.parent and ico_elem.parent.parent and ico_elem.parent.parent.name == 'a':
download_link = ico_elem.parent.parent
if not download_link:
retry_count += 1
if retry_count < max_retries:
logging.warning(f"未找到下载链接,正在重试 ({retry_count}/{max_retries})")
time.sleep(2) # 等待2秒后重试
else:
# 保存HTML以便调试
debug_file = os.path.join(work_folder, f"debug_{data_id}.html")
with open(debug_file, "w", encoding="utf-8") as f:
f.write(read_res.text)
logging.warning(f"未找到文件 {file_name} 的下载按钮链接,已保存调试HTML到: {debug_file}")
except Exception as e:
retry_count += 1
logging.error(f"访问阅读页出错: {str(e)},重试 ({retry_count}/{max_retries})")
if retry_count >= max_retries:
break
time.sleep(2) # 等待2秒后重试
if not download_link:
continue
try:
# 获取href属性并清理
href_value = download_link['href'].strip()
# 处理被反引号包围的URL
if href_value.startswith('`') and href_value.endswith('`'):
href_value = href_value.strip('`').strip()
logging.info(f"清理反引号后的链接: {href_value}")
elif '`' in href_value: # 处理可能的部分反引号
href_value = href_value.replace('`', '').strip()
logging.info(f"清理部分反引号后的链接: {href_value}")
# 处理相对URL
if href_value.startswith('/'):
href_value = f"https://mooc1.chaoxing.com{href_value}"
logging.info(f"处理相对路径后的链接: {href_value}")
elif not (href_value.startswith('http://') or href_value.startswith('https://')):
href_value = f"https://mooc1.chaoxing.com/{href_value.lstrip('/')}"
logging.info(f"处理非标准URL后的链接: {href_value}")
final_download_url = href_value
# 设置完整的 headers
download_headers = {
"User-Agent": self.headers["User-Agent"],
"Referer": read_url,
}
# 开始下载文件
logging.info(f"开始下载文件: {file_name}")
file_path = os.path.join(work_folder, self._sanitize_filename(file_name))
# 获取文件大小
file_size_res = self.session.head(final_download_url, headers=download_headers, timeout=15)
file_size = int(file_size_res.headers.get('content-length', 0))
with self.session.get(final_download_url, stream=True, headers=download_headers,
timeout=30) as r:
r.raise_for_status()
downloaded_size = 0
with open(file_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded_size += len(chunk)
if progress_callback and file_size > 0:
progress = int((downloaded_size / file_size) * 100)
progress_callback(file_name, progress)
logging.info(f"文件已保存至: {file_path}")
downloaded_files.append(file_path)
break # 下载成功,退出重试循环
except Exception as e:
logging.error(f"下载文件 {file_name} 失败: {str(e)}")
retry_count += 1
if retry_count < max_retries:
logging.warning(f"下载失败,正在重试 ({retry_count}/{max_retries})")
time.sleep(2) # 等待2秒后重试
download_link = None # 重置下载链接,重新查找
else:
break # 达到最大重试次数,退出循环
if downloaded_files:
return downloaded_files, None
else:
# 即使没有下载成功任何文件,也返回空列表而不是None,避免调用方出错
return [], "未能成功下载任何文件,但已创建作业文件夹"
except requests.exceptions.Timeout:
logging.error("下载超时,请检查网络连接")
return [], "下载超时,请检查网络连接"
except requests.exceptions.ConnectionError:
logging.error("网络连接错误")
return [], "网络连接错误"
except Exception as e:
logging.error(f"下载文件失败: {str(e)}")
return [], f"下载文件失败: {str(e)}"
def _sanitize_filename(self, filename):
"""清理文件名,移除不合法字符"""
# 替换Windows文件名中不允许的字符
invalid_chars = ['\\', '/', ':', '*', '?', '"', '<', '>', '|']
for char in invalid_chars:
filename = filename.replace(char, '_')
return filename
class MainWindow(QWidget):
def __init__(self):
super().__init__()
self.downloader = ChaoXingWorkDownloader()
self.download_thread = None # 这行可以保留,不会影响功能
self.settings = QSettings("ChaoXingDownloader", "Settings")
self.init_ui()
self.load_settings()
def init_ui(self):
self.setWindowTitle("超星作业下载工具")
self.resize(800, 600)
main_layout = QVBoxLayout()
# 登录区域
login_group = QGroupBox("登录信息")
login_layout = QGridLayout()
self.user_edit = QLineEdit()
self.user_edit.setPlaceholderText("账号")
self.pwd_edit = QLineEdit()
self.pwd_edit.setPlaceholderText("密码")
self.pwd_edit.setEchoMode(QLineEdit.Password)
self.remember_checkbox = QCheckBox("记住账号密码")
self.login_btn = QPushButton("登录")
self.login_btn.clicked.connect(self.login)
login_layout.addWidget(QLabel("账号:"), 0, 0)
login_layout.addWidget(self.user_edit, 0, 1)
login_layout.addWidget(QLabel("密码:"), 0, 2)
login_layout.addWidget(self.pwd_edit, 0, 3)
login_layout.addWidget(self.remember_checkbox, 0, 4)
login_layout.addWidget(self.login_btn, 0, 5)
login_group.setLayout(login_layout)
main_layout.addWidget(login_group)
# 课程区域
course_group = QGroupBox("课程列表")
course_layout = QVBoxLayout()
course_btn_layout = QHBoxLayout()
self.course_btn = QPushButton("获取课程")
self.course_btn.clicked.connect(self.get_courses)
self.course_btn.setEnabled(False)
course_btn_layout.addWidget(self.course_btn)
course_btn_layout.addStretch()
self.course_list = QListWidget()
self.course_list.setSelectionMode(QListWidget.SingleSelection)
course_layout.addLayout(course_btn_layout)
course_layout.addWidget(self.course_list)
course_group.setLayout(course_layout)
main_layout.addWidget(course_group)
# 作业区域
work_group = QGroupBox("作业列表")
work_layout = QVBoxLayout()
work_btn_layout = QHBoxLayout()
self.work_btn = QPushButton("获取作业")
self.work_btn.clicked.connect(self.get_works)
self.work_btn.setEnabled(False)
self.download_btn = QPushButton("下载选中作业")
self.download_btn.clicked.connect(self.download_selected_works)
self.download_btn.setEnabled(False)
self.download_all_btn = QPushButton("下载全部作业")
self.download_all_btn.clicked.connect(self.download_all_works)
self.download_all_btn.setEnabled(False)
self.select_folder_btn = QPushButton("选择下载目录")
self.select_folder_btn.clicked.connect(self.select_download_folder)
work_btn_layout.addWidget(self.work_btn)
work_btn_layout.addWidget(self.download_btn)
work_btn_layout.addWidget(self.download_all_btn)
work_btn_layout.addWidget(self.select_folder_btn)
work_btn_layout.addStretch()
self.work_list = QListWidget()
self.work_list.setSelectionMode(QListWidget.MultiSelection)
self.work_list.itemSelectionChanged.connect(self.update_download_button)
work_layout.addLayout(work_btn_layout)
work_layout.addWidget(self.work_list)
work_group.setLayout(work_layout)
main_layout.addWidget(work_group)
# 下载进度区域
progress_group = QGroupBox("下载进度")
progress_layout = QVBoxLayout()
self.progress_bar = QProgressBar()
self.progress_bar.setRange(0, 100)
self.progress_bar.setValue(0)
self.progress_label = QLabel("准备就绪")
progress_layout.addWidget(self.progress_label)
progress_layout.addWidget(self.progress_bar)
progress_group.setLayout(progress_layout)
main_layout.addWidget(progress_group)
# 状态栏
status_layout = QHBoxLayout()
self.status_label = QLabel("就绪")
self.folder_label = QLabel(f"下载目录: {self.downloader.download_folder}")
status_layout.addWidget(self.status_label)
status_layout.addStretch()
status_layout.addWidget(self.folder_label)
main_layout.addLayout(status_layout)
self.setLayout(main_layout)
def login(self):
user = self.user_edit.text().strip()
pwd = self.pwd_edit.text().strip()
if not user or not pwd:
QMessageBox.warning(self, "提示", "请输入账号和密码")
return
self.status_label.setText("正在登录...")
QApplication.processEvents()
success, error_msg = self.downloader.login(user, pwd)
if success:
QMessageBox.information(self, "提示", "登录成功")
self.course_btn.setEnabled(True)
self.status_label.setText("已登录")
# 保存设置
if self.remember_checkbox.isChecked():
self.save_settings()
else:
QMessageBox.critical(self, "错误", f"登录失败: {error_msg}")
self.status_label.setText("登录失败")
def save_settings(self):
if self.remember_checkbox.isChecked():
self.settings.setValue("username", self.user_edit.text())
self.settings.setValue("password", self.pwd_edit.text())
self.settings.setValue("remember", True)
else:
self.settings.remove("username")
self.settings.remove("password")
self.settings.setValue("remember", False)
self.settings.setValue("download_folder", self.downloader.download_folder)
def load_settings(self):
if self.settings.value("remember", False, type=bool):
self.user_edit.setText(self.settings.value("username", ""))
self.pwd_edit.setText(self.settings.value("password", ""))
self.remember_checkbox.setChecked(True)
saved_folder = self.settings.value("download_folder", "")
if saved_folder and os.path.exists(saved_folder):
self.downloader.download_folder = saved_folder
self.folder_label.setText(f"下载目录: {saved_folder}")
def get_courses(self):
self.course_list.clear()
courses = self.downloader.get_courses()
for c in courses:
self.course_list.addItem(f"{c['index']}. {c['name']}")
self.work_btn.setEnabled(True)
def get_works(self):
selected = self.course_list.currentRow()
if selected < 0:
QMessageBox.warning(self, "提示", "请先选择课程")
return
self.status_label.setText("正在获取作业列表...")
QApplication.processEvents()
course = self.downloader.select_course(selected)
if course:
self.work_list.clear()
self.downloader.get_works(course["url"])
if self.downloader.work_list:
for w in self.downloader.work_list:
self.work_list.addItem(f"{w['title']} - {w['status']}")
self.download_all_btn.setEnabled(True)
self.status_label.setText(f"已获取 {len(self.downloader.work_list)} 个作业")
else:
self.status_label.setText("未找到作业")
def update_download_button(self):
"""根据选中项更新下载按钮状态"""
selected_items = self.work_list.selectedItems()
self.download_btn.setEnabled(len(selected_items) > 0)
def select_download_folder(self):
"""选择下载目录"""
folder = QFileDialog.getExistingDirectory(self, "选择下载目录", self.downloader.download_folder)
if folder:
self.downloader.download_folder = folder
self.folder_label.setText(f"下载目录: {folder}")
self.save_settings()
def download_selected_works(self):
"""下载选中的作业"""
selected_items = self.work_list.selectedItems()
if not selected_items:
QMessageBox.warning(self, "提示", "请先选择要下载的作业")
return
selected_indices = [self.work_list.row(item) for item in selected_items]
self._start_download(selected_indices)
def download_all_works(self):
"""下载全部作业"""
if not self.downloader.work_list:
QMessageBox.warning(self, "提示", "作业列表为空")
return
all_indices = list(range(len(self.downloader.work_list)))
self._start_download(all_indices)
def _start_download(self, indices):
"""直接下载作业(单线程)"""
self.progress_bar.setValue(0)
self.progress_label.setText("准备下载...")
self.status_label.setText("正在下载...")
# 禁用下载按钮,防止重复点击
self.download_btn.setEnabled(False)
self.download_all_btn.setEnabled(False)
# 直接在主线程中下载
results = []
errors = []
for idx in indices:
if 0 <= idx < len(self.downloader.work_list):
work = self.downloader.work_list[idx]
self.progress_label.setText(f"正在下载: {work['title']}")
self.progress_bar.setValue(0)
QApplication.processEvents() # 更新UI
files, error = self.downloader.download_work_file(
work['detail_url'],
progress_callback=lambda filename, progress: self.update_download_progress(filename, progress)
)
if files:
results.append({
"title": work['title'],
"files": files
})
else:
errors.append(f"{work['title']}: {error}")
else:
errors.append(f"无效的作业索引: {idx}")
QApplication.processEvents() # 更新UI
# 下载完成后调用回调函数
self.on_download_complete(results, errors)
def update_download_progress(self, filename, progress):
"""更新下载进度"""
self.progress_bar.setValue(progress)
self.progress_label.setText(f"正在下载: {filename} ({progress}%)")
QApplication.processEvents() # 确保UI更新
def on_download_complete(self, results, errors):
"""下载完成回调"""
# 恢复按钮状态
self.download_btn.setEnabled(self.work_list.selectedItems())
self.download_all_btn.setEnabled(bool(self.downloader.work_list))
# 更新状态
self.progress_bar.setValue(100)
self.progress_label.setText("下载完成")
self.status_label.setText("就绪")
# 显示结果
if results:
success_msg = f"成功下载 {len(results)} 个作业"
if errors:
QMessageBox.warning(self, "下载完成",
f"{success_msg}\n但有 {len(errors)} 个错误:\n" + "\n".join(errors))
else:
QMessageBox.information(self, "下载完成", success_msg)
# 询问是否打开下载目录
reply = QMessageBox.question(self, "下载完成", "是否打开下载目录?",
QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes)
if reply == QMessageBox.Yes:
os.startfile(self.downloader.download_folder)
elif errors:
QMessageBox.critical(self, "下载失败", "\n".join(errors))
if __name__ == "__main__":
app = QApplication(sys.argv)
win = MainWindow()
win.show()
sys.exit(app.exec_())