[Python] 纯文本查看 复制代码
#version 1.2
import os
import sys
import time
import re
import json
import requests
import urllib3
import random
from urllib.parse import urlparse
from urllib3.exceptions import InsecureRequestWarning
# 禁用SSL警告
urllib3.disable_warnings(InsecureRequestWarning)
from PyQt5.QtGui import QFont, QIcon
from bs4 import BeautifulSoup
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QLabel, QLineEdit, QPushButton, QListWidget, QListWidgetItem,
QProgressBar, QFileDialog, QMessageBox,
QStatusBar, QCheckBox)
from PyQt5.QtCore import Qt, QThread, pyqtSignal
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3 import Retry
# 添加基础URL常量
BASE_URL = "http://www.22a5.com"
API_URL = f"{BASE_URL}/js/play.php"
class SearchThread(QThread):
"""搜索音乐线程"""
search_finished = pyqtSignal(list) # 搜索完成信号,传递结果列表
search_error = pyqtSignal(str) # 搜索错误信号,传递错误信息
def __init__(self, keyword, session):
super().__init__()
self.keyword = keyword
self.session = session
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Referer': BASE_URL,
}
def run(self):
"""执行搜索"""
try:
play_list = [] # 存储所有<li>元素
for page in range(1, 4): # 只搜索前3页,减少请求次数
# 添加随机延迟,避免请求过于频繁
delay = random.uniform(1.0, 3.0)
time.sleep(delay)
query_url = f'http://www.22a5.com/so/{self.keyword}/{page}.html'
for attempt in range(3): # 每页最多重试3次
try:
response = self.session.get(
query_url,
headers=self.headers,
timeout=15,
verify=False
)
# 检查HTTP状态码
if response.status_code == 403:
error_msg = f"第{page}页访问被拒绝(403),尝试 {attempt+1}/3"
self.search_error.emit(error_msg)
time.sleep(2 ** attempt) # 指数退避等待
continue
elif response.status_code != 200:
error_msg = f"第{page}页请求失败,状态码: {response.status_code}"
self.search_error.emit(error_msg)
break
response.raise_for_status() # 检查HTTP状态码
soup = BeautifulSoup(response.text, 'html.parser')
container = soup.find('div', class_='play_list')
if container:
query_list = container.find_all('li')
play_list.extend(query_list)
break # 成功则跳出重试循环
else:
self.search_error.emit(f"警告:第{page}页未找到play_list容器")
break
except requests.exceptions.RequestException as e:
error_msg = f"请求异常(尝试{attempt + 1}/3): {str(e)}"
self.search_error.emit(error_msg)
time.sleep(2 ** attempt) # 指数退避等待
except Exception as e:
self.search_error.emit(f"解析异常: {str(e)}")
break
self.search_finished.emit(play_list) # 发射搜索完成信号
except Exception as e:
self.search_error.emit(f"搜索失败: {str(e)}")
class MusicDownloaderThread(QThread):
"""多线程下载类"""
progress_signal = pyqtSignal(int, int, int) # 当前下载序号, 总数量, 进度百分比
finished_signal = pyqtSignal(int, str, bool) # 索引, 文件名, 是否成功
error_signal = pyqtSignal(str)
finished_all = pyqtSignal() # 所有下载完成信号
def __init__(self, music_list, download_path):
super().__init__()
self.music_list = music_list
self.download_path = download_path
self.session = Session()
self.running = True
self.headers = {
'Referer': BASE_URL,
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'
}
def run(self):
"""执行下载任务"""
total = len(self.music_list)
for idx, (name, url) in enumerate(self.music_list):
if not self.running:
break
self.progress_signal.emit(idx + 1, total, 0)
try:
# 从URL中提取音乐ID(字符串格式)
parsed_url = urlparse(url)
path_parts = parsed_url.path.split('/')
if not path_parts or not path_parts[-1].endswith('.html'):
self.error_signal.emit(f"URL格式不正确: {url}")
continue
music_id = path_parts[-1].replace('.html', '')
if not music_id:
self.error_signal.emit(f"无法从URL提取音乐ID: {url}")
continue
# 获取实际下载链接
response = self.session.post(
API_URL,
headers=self.headers,
data={'id': music_id, 'type': 'music'},
timeout=30,
verify=False # 禁用SSL验证
)
# 检查响应状态
if response.status_code != 200:
self.error_signal.emit(f"API请求失败: 状态码 {response.status_code}")
continue
try:
music_data = response.json()
except json.JSONDecodeError:
self.error_signal.emit(f"API返回的不是有效JSON: {response.text[:100]}...")
continue
# 检查API响应
if music_data.get('msg') != 1:
self.error_signal.emit(f"API返回错误: {music_data}")
continue
music_url = music_data.get('url')
if not music_url:
self.error_signal.emit(f"无法获取下载链接: {name}")
continue
# 使用原始文件名但确保安全
safe_name = re.sub(r'[\\/*?:"<>|]', "", name)
# 从URL获取文件扩展名
parsed_music_url = urlparse(music_url)
music_path = parsed_music_url.path
_, ext = os.path.splitext(music_path)
# 如果没有扩展名或扩展名无效,使用默认扩展名
if not ext or len(ext) > 5: # 扩展名通常不长于4个字符
ext = '.m4a' # 根据你提供的信息,默认使用m4a
filename = f"{safe_name}{ext}"
filepath = os.path.join(self.download_path, filename)
# 下载音乐 - 禁用SSL验证
response = self.session.get(
music_url,
headers={'User-Agent': self.headers['User-Agent']},
stream=True,
timeout=60,
verify=False # 禁用SSL验证
)
response.raise_for_status()
# 带进度下载
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=1024):
if not self.running:
break
if chunk:
f.write(chunk)
downloaded += len(chunk)
progress = int((downloaded / total_size) * 100) if total_size > 0 else 0
self.progress_signal.emit(idx + 1, total, progress)
if self.running:
self.finished_signal.emit(idx, filename, True)
except Exception as e:
if self.running:
self.finished_signal.emit(idx, name, False)
self.error_signal.emit(f"下载失败: {name} - {str(e)}")
# 所有下载完成后发送信号
self.finished_all.emit()
def stop(self):
"""停止下载"""
self.running = False
class MusicDownloaderApp(QMainWindow):
"""音乐下载器主界面"""
def __init__(self):
super().__init__()
# 设置默认下载路径
self.default_path = "D:\\music"
# 设置窗口图标(确保music.ico文件存在)
if os.path.exists("music.ico"):
self.setWindowIcon(QIcon("music.ico"))
# 尝试从文件加载保存的路径
self.download_path = self.load_saved_path()
os.makedirs(self.download_path, exist_ok=True)
self.init_ui()
self.setup_connections()
# 初始化会话
self.session = Session()
self.session_ask()
def load_saved_path(self):
"""从文件加载保存的下载路径"""
try:
if os.path.exists("download_path.txt"):
with open("download_path.txt", "r") as f:
saved_path = f.read().strip()
if saved_path and os.path.isdir(saved_path):
return saved_path
except:
pass
return self.default_path
def save_path_to_file(self, path):
"""保存路径到文件"""
try:
with open("download_path.txt", "w") as f:
f.write(path)
except Exception as e:
self.show_error(f"保存路径失败: {str(e)}")
def init_ui(self):
"""初始化用户界面"""
self.setWindowTitle("音乐下载器")
self.setGeometry(300, 200, 800, 500)
# 主布局
main_widget = QWidget()
self.setCentralWidget(main_widget)
main_layout = QVBoxLayout(main_widget)
# 顶部搜索区域
search_layout = QHBoxLayout()
self.search_input = QLineEdit()
self.search_input.setPlaceholderText("请输入歌曲名称或歌手...")
search_layout.addWidget(self.search_input)
self.search_btn = QPushButton("搜索")
search_layout.addWidget(self.search_btn)
# 下载路径选择
self.path_btn = QPushButton("选择下载路径")
search_layout.addWidget(self.path_btn)
# 显示当前路径
self.path_label = QLabel(f"当前下载路径: {self.download_path}")
search_layout.addWidget(self.path_label)
main_layout.addLayout(search_layout)
# 搜索结果区域
result_group = QVBoxLayout()
result_group.addWidget(QLabel("搜索结果"))
self.result_list = QListWidget()
self.result_list.setSelectionMode(QListWidget.ExtendedSelection) # 多选模式
result_group.addWidget(self.result_list)
# 下载控制
download_control_layout = QHBoxLayout()
self.download_btn = QPushButton("下载选中")
self.select_all_btn = QPushButton("全选")
download_control_layout.addWidget(self.select_all_btn)
download_control_layout.addWidget(self.download_btn)
result_group.addLayout(download_control_layout)
# 批量下载选项
self.batch_download_check = QCheckBox("批量下载模式")
result_group.addWidget(self.batch_download_check)
main_layout.addLayout(result_group, 1)
# 进度条
self.progress_bar = QProgressBar()
self.progress_bar.setVisible(False)
main_layout.addWidget(self.progress_bar)
# 状态栏 - 版权声明
self.status_bar = QStatusBar()
self.setStatusBar(self.status_bar)
copyright_label = QLabel("该软件仅供学习和娱乐,如有侵权请联系删除")
copyright_label.setFont(QFont("Arial", 8))
self.status_bar.addPermanentWidget(copyright_label)
def setup_connections(self):
"""设置信号连接"""
self.search_btn.clicked.connect(self.start_search_thread)
self.download_btn.clicked.connect(self.download_selected)
self.select_all_btn.clicked.connect(self.select_all_results)
self.path_btn.clicked.connect(self.select_download_path)
def session_ask(self):
"""初始化会话"""
try:
# 使用基础URL而不是空字符串
self.session.get(url=BASE_URL, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36'
}, timeout=10, verify=False) # 禁用SSL验证
except Exception as e:
self.show_error(f"初始化失败: {str(e)}")
def start_search_thread(self):
"""启动搜索线程"""
keyword = self.search_input.text().strip()
if not keyword:
self.show_warning("请输入搜索关键词")
return
self.result_list.clear()
self.search_btn.setEnabled(False)
self.search_btn.setText("搜索中...")
# 显示搜索状态
if hasattr(self, 'left_label') and self.left_label is not None:
self.left_label.setText("正在搜索,请稍候...")
# 创建并启动搜索线程
self.search_thread = SearchThread(keyword, self.session)
self.search_thread.search_finished.connect(self.handle_search_results)
self.search_thread.search_error.connect(self.handle_search_error)
self.search_thread.start()
def handle_search_results(self, play_list):
"""处理搜索完成的结果"""
self.search_btn.setEnabled(True)
self.search_btn.setText("搜索")
status_bar = self.statusBar()
# 检查是否已存在左下角标签
if hasattr(self, 'left_label') and self.left_label is not None:
status_bar.removeWidget(self.left_label)
self.left_label.deleteLater()
self.left_label = None
# 创建新标签(带红色字体)
self.left_label = QLabel(f"总共获取到{len(play_list)}个结果")
self.left_label.setStyleSheet("color: red; font: 8pt 'Arial';")
status_bar.addWidget(self.left_label)
if play_list:
for idx, li in enumerate(play_list, 1):
try:
link = li.find('a')
if link:
name = link.text.strip()
href = BASE_URL + link['href']
list_item = QListWidgetItem(f"{idx}. {name}")
list_item.setData(Qt.UserRole, (name, href))
self.result_list.addItem(list_item)
except Exception as e:
print(f"无效的<li>元素: {li}")
else:
self.show_info("未找到相关结果")
def handle_search_error(self, error_msg):
"""处理搜索过程中的错误"""
# 只在控制台输出错误信息,不中断搜索流程
print(error_msg)
def download_selected(self):
"""下载选中的音乐"""
selected_items = self.result_list.selectedItems()
if not selected_items:
self.show_warning("请先选择要下载的歌曲")
return
# 准备下载列表
download_list = []
for item in selected_items:
name, url = item.data(Qt.UserRole)
download_list.append((name, url))
# 批量下载模式
if self.batch_download_check.isChecked():
self.start_download_thread(download_list)
else:
# 单曲下载模式
for name, url in download_list:
self.start_download_thread([(name, url)])
def start_download_thread(self, download_list):
"""启动下载线程"""
if not download_list:
return
# 创建并启动下载线程
self.download_thread = MusicDownloaderThread(download_list, self.download_path)
self.download_thread.progress_signal.connect(self.update_download_progress)
self.download_thread.finished_signal.connect(self.download_finished)
self.download_thread.error_signal.connect(self.show_error)
self.download_thread.finished_all.connect(self.download_finished_all) # 连接完成信号
self.download_thread.start()
# 显示进度条
self.progress_bar.setVisible(True)
self.progress_bar.setRange(0, 100)
self.progress_bar.setValue(0)
self.download_btn.setEnabled(False)
def update_download_progress(self, current, total, progress):
"""更新下载进度"""
self.progress_bar.setFormat(f"下载中: {current}/{total} ({progress}%)")
self.progress_bar.setValue(progress)
def download_finished(self, index, filename, success):
"""下载完成处理"""
if success:
self.show_info(f"下载完成: {filename}")
def download_finished_all(self):
"""所有下载任务完成处理"""
self.progress_bar.setVisible(False)
self.download_btn.setEnabled(True) # 确保下载按钮重新启用
self.show_info("所有下载任务已完成!")
def select_download_path(self):
"""选择下载路径"""
path = QFileDialog.getExistingDirectory(
self,
"选择下载目录",
self.download_path,
QFileDialog.ShowDirsOnly
)
if path:
self.download_path = path
self.path_label.setText(f"当前下载路径: {path}")
self.save_path_to_file(path)
self.show_info(f"下载路径已设置为: {path}")
def select_all_results(self):
"""全选搜索结果"""
self.result_list.selectAll()
def show_error(self, message):
"""显示错误信息"""
QMessageBox.critical(self, "错误", message)
def show_warning(self, message):
"""显示警告信息"""
QMessageBox.warning(self, "警告", message)
def show_info(self, message):
"""显示信息"""
QMessageBox.information(self, "提示", message)
def closeEvent(self, event):
"""关闭窗口时的处理"""
# 停止搜索线程(如果正在运行)
if hasattr(self, 'search_thread') and self.search_thread.isRunning():
self.search_thread.terminate()
self.search_thread.wait()
# 停止下载线程(如果正在运行)
if hasattr(self, 'download_thread') and self.download_thread.isRunning():
self.download_thread.stop()
self.download_thread.wait()
event.accept()
if __name__ == "__main__":
app = QApplication(sys.argv)
downloader = MusicDownloaderApp()
downloader.show()
sys.exit(app.exec_())