好友
阅读权限10
听众
最后登录1970-1-1
|
本帖最后由 qq196796483 于 2026-5-14 15:54 编辑
DJ下载器,拷贝到U盘听歌,我想优化我之前学的,在实现一下,其实就是从无到有,在追求完美,进行优化处理{:1_890:} ,纯技术交流学习,欢迎指点哪里不足,或者添加什么功能,都可以告诉我,在进行优化处理
你也可以在我原来的代码基础上进行修改
效果图,选择多个下载用SHIFT+鼠标点击
01.正常请求,同步请求,这个代码的主要作用是请求热门的100首列表,进行下载,但是是同步下载,速度会慢一点,我们要对别人的服务器要温柔一点
主要实现的是自动创建一个歌曲文件夹,过滤掉特殊符号防止无法写入,然后进行下载
li_list = html.xpath("//ul[@id='playlist']/li[position() <= 10] 我测试的时候下载了十条
如果想全部下载
("//ul[@id='playlist']/li[position() <= 10]
改写成//ul[@id='playlist']/li
用xpath匹配
[Python] 纯文本查看 复制代码 import re
import os
import requests
from lxml import etree
import time
# --- 全局常量与配置 ---
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
BASE_URL = "https://www.73dj.com"
AUDIO_BASE_URL = "https://p21.72djapp.cn/m4adj"
SAVE_DIR = "歌曲" # 定义保存文件夹名称
def sanitize_filename(filename: str) -> str:
"""
清理文件名中的非法字符,防止在 Windows 系统下保存失败。
将非法字符替换为下划线 '_'。
"""
# 匹配 Windows 文件名中的非法字符:\ / : * ? " < > |
# 注意:在正则表达式中,反斜杠 \ 需要转义写成 \\
illegal_chars = r'[\\/:*?"<>|]'
# 将这些非法字符替换为下划线(或者你可以换成空字符串 '')
safe_filename = re.sub(illegal_chars, '_', filename)
return safe_filename.strip() # 顺便去掉首尾的空格
def download_music(url: str):
"""
解析详情页并下载歌曲到指定文件夹
"""
# 1. 确保文件夹存在 (如果没有就创建,有则跳过)
if not os.path.exists(SAVE_DIR):
os.makedirs(SAVE_DIR)
print(f"[*] 已创建文件夹: {SAVE_DIR}")
response = requests.get(url, headers=HEADERS)
response.encoding = response.apparent_encoding
content = response.text
# 解析歌曲名和路径
name_match = re.search(r'var p_n="(.*?)"', content)
path_match = re.search(r'var danceFilePath="(.*?)"', content)
if not name_match or not path_match:
print(f"[-] 页面解析失败: {url}")
return
# 1. 提取原始名字
raw_name = name_match.group(1)
# 2. 清洗非法字符
name = sanitize_filename(raw_name)
dance_file_path = path_match.group(1)
# 2. 拼接下载地址和保存路径
audio_url = f"{AUDIO_BASE_URL}/{dance_file_path}.m4a"
# 使用 os.path.join 自动处理斜杠,跨平台更安全
file_path = os.path.join(SAVE_DIR, f"{name}.m4a")
print(f"
[*] 正在下载: {name}")
try:
audio_response = requests.get(audio_url, headers=HEADERS)
# 3. 写入到指定路径
with open(file_path, "wb") as f:
f.write(audio_response.content)
print(f"[+] 保存成功至: {file_path}\n")
except Exception as e:
print(f"[!] 下载失败: {name}, 错误: {e}")
def directory_url():
"""
获取排行榜并循环下载
"""
url = f"{BASE_URL}/top/good.htm"
response = requests.get(url, headers=HEADERS)
response.encoding = response.apparent_encoding
html = etree.HTML(response.text)
li_list = html.xpath("//ul[@id='playlist']/li[position() <= 10]")
for item in li_list:
href_list = item.xpath(".//a/@href")
if href_list:
new_url = f"{BASE_URL}{href_list[0]}"
download_music(new_url)
if __name__ == '__main__':
time = time.time()
directory_url()
time_end = time.time()
print("总共耗时:", time_end - time)
既然都实现了100歌曲的下载,那我想搜索下载也是一样的操作,所以我就写了一个搜索下载的
但是我发现它有一个
人气和默认
所以他有两个链接可以选择所以我就想可否进行选择呢,我要热门看一下也要默认看一下,我观察了一下变化发现变动不是很大
人气是在后面加&by=hits
默认是没有那就好办了,用if写就可以啦
实现的代码如下
也是同步
02.搜索下载一页
[Python] 纯文本查看 复制代码 import re
import os
import requests
from lxml import etree
import time
from urllib.parse import quote
HEADERS = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/148.0.0.0 Safari/537.36 Edg/148.0.0.0',
'referer': 'https://www.73dj.com'
}
BASE_URL = "https://www.73dj.com"
AUDIO_BASE_URL = "https://p21.72djapp.cn/m4adj"
SESSION = requests.Session()
SESSION.headers.update(HEADERS)
SAVE_DIR = "歌曲" # 定义保存文件夹名称
def popularity_directory(url):
# 现在真正使用了传入的 url 参数
response = SESSION.get(url, timeout=10)
response.encoding = response.apparent_encoding
html = etree.HTML(response.text)
directory_list = html.xpath("//div[@class='wq_11s']/ul")[:5]
print(f"找到 {len(directory_list)} 个目录")
links = [] # 新增:用于存储链接的列表
for item in directory_list:
# 提取链接
link = item.xpath(".//a/@href")[0]
complete_link = BASE_URL + link
print(complete_link)
links.append(complete_link) # 新增:将链接添加到列表
return links # 新增:返回收集到的所有链接
def sanitize_filename(filename: str) -> str:
"""
清理文件名中的非法字符,防止在 Windows 系统下保存失败。
将非法字符替换为下划线 '_'。
"""
# 匹配 Windows 文件名中的非法字符:\ / : * ? " < > |
# 注意:在正则表达式中,反斜杠 \ 需要转义写成 \\
illegal_chars = r'[\\/:*?"<>|]'
# 将这些非法字符替换为下划线(或者你可以换成空字符串 '')
safe_filename = re.sub(illegal_chars, '_', filename)
return safe_filename.strip() # 顺便去掉首尾的空格
def download_music(url: str):
"""
解析详情页并下载歌曲到指定文件夹
"""
# 1. 确保文件夹存在 (如果没有就创建,有则跳过)
if not os.path.exists(SAVE_DIR):
os.makedirs(SAVE_DIR)
print(f" 已创建文件夹: {SAVE_DIR}")
response = requests.get(url, headers=HEADERS)
response.encoding = response.apparent_encoding
content = response.text
# 解析歌曲名和路径
name_match = re.search(r'var p_n="(.*?)"', content)
path_match = re.search(r'var danceFilePath="(.*?)"', content)
if not name_match or not path_match:
print(f"[-] 页面解析失败: {url}")
return
# 1. 提取原始名字
raw_name = name_match.group(1)
# 2. 清洗非法字符
name = sanitize_filename(raw_name)
dance_file_path = path_match.group(1)
# 2. 拼接下载地址和保存路径
audio_url = f"{AUDIO_BASE_URL}/{dance_file_path}.m4a"
# 使用 os.path.join 自动处理斜杠,跨平台更安全
file_path = os.path.join(SAVE_DIR, f"{name}.m4a")
print(f" 正在下载: {name}")
try:
audio_response = requests.get(audio_url, headers=HEADERS)
# 3. 写入到指定路径
with open(file_path, "wb") as f:
f.write(audio_response.content)
print(f"[+] 保存成功至: {file_path}\n")
except Exception as e:
print(f"[!] 下载失败: {name}, 错误: {e}")
def main():
chinese_str = "抖音"
encoded_str = quote(chinese_str.encode('gbk'))
url = f"https://www.73dj.com/search.htm?keyword={encoded_str}&by=hits"
links =popularity_directory(url)
for link in links:
download_music(link)
pass
if __name__ == '__main__':
main()
但是只实现了一页的查看,我还是觉得不太满意,所以我实现了翻页处理
也是同步
如下[Python] 纯文本查看 复制代码 import re
import os
import requests
from lxml import etree
import time
from urllib.parse import quote
HEADERS = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/148.0.0.0 Safari/537.36 Edg/148.0.0.0',
'referer': 'https://www.73dj.com'
}
BASE_URL = "https://www.73dj.com"
AUDIO_BASE_URL = "https://p21.72djapp.cn/m4adj"
SESSION = requests.Session()
SESSION.headers.update(HEADERS)
SAVE_DIR = "歌曲" # 定义保存文件夹名称
def popularity_directory(url):
# 现在真正使用了传入的 url 参数
response = SESSION.get(url, timeout=10)
response.encoding = response.apparent_encoding
html = etree.HTML(response.text)
# 提取最大的页数方便循环
text = html.xpath("//ul[@class='manu']/a[contains(text(), '页数')]/text()")[0]
page_num = re.search(r'\d+', text).group()
print(page_num) # 输出: 11
directory_list = html.xpath("//div[@class='wq_11s']/ul")
print(f"找到 {len(directory_list)} 个目录")
links = [] # 新增:用于存储链接的列表
for item in directory_list:
# 提取链接
link = item.xpath(".//a/@href")[0]
complete_link = BASE_URL + link
print(complete_link)
links.append(complete_link) # 新增:将链接添加到列表
return links,page_num # 新增:返回收集到的所有链接
def sanitize_filename(filename: str) -> str:
"""
清理文件名中的非法字符,防止在 Windows 系统下保存失败。
将非法字符替换为下划线 '_'。
"""
# 匹配 Windows 文件名中的非法字符:\ / : * ? " < > |
# 注意:在正则表达式中,反斜杠 \ 需要转义写成 \\
illegal_chars = r'[\\/:*?"<>|]'
# 将这些非法字符替换为下划线(或者你可以换成空字符串 '')
safe_filename = re.sub(illegal_chars, '_', filename)
return safe_filename.strip() # 顺便去掉首尾的空格
def download_music(url: str):
"""
解析详情页并下载歌曲到指定文件夹
"""
# 1. 确保文件夹存在 (如果没有就创建,有则跳过)
if not os.path.exists(SAVE_DIR):
os.makedirs(SAVE_DIR)
print(f" 已创建文件夹: {SAVE_DIR}")
response = requests.get(url, headers=HEADERS)
response.encoding = response.apparent_encoding
content = response.text
# 解析歌曲名和路径
name_match = re.search(r'var p_n="(.*?)"', content)
path_match = re.search(r'var danceFilePath="(.*?)"', content)
if not name_match or not path_match:
print(f"[-] 页面解析失败: {url}")
return
# 1. 提取原始名字
raw_name = name_match.group(1)
# 2. 清洗非法字符
name = sanitize_filename(raw_name)
dance_file_path = path_match.group(1)
# 2. 拼接下载地址和保存路径
audio_url = f"{AUDIO_BASE_URL}/{dance_file_path}.m4a"
# 使用 os.path.join 自动处理斜杠,跨平台更安全
file_path = os.path.join(SAVE_DIR, f"{name}.m4a")
print(f" 正在下载: {name}")
try:
audio_response = requests.get(audio_url, headers=HEADERS)
# 3. 写入到指定路径
with open(file_path, "wb") as f:
f.write(audio_response.content)
print(f"[+] 保存成功至: {file_path}\n")
except Exception as e:
print(f"[!] 下载失败: {name}, 错误: {e}")
def main():
chinese_str = "抖音"
encoded_str = quote(chinese_str.encode('gbk'))
# ========== 新增:选择排序方式 ==========
print("请选择排序方式:")
print("1. 人气排列")
print("2. 默认排列")
choice = input("请输入选项 (1/2): ").strip()
if choice == "1":
base_url = f"https://www.73dj.com/search.htm?keyword={encoded_str}&by=hits"
print(" 已选择:人气排列")
elif choice == "2":
# 默认排列带空 by 参数,翻页后变成 ...&by=&page=2
base_url = f"https://www.73dj.com/search.htm?keyword={encoded_str}&by="
print(" 已选择:默认排列")
else:
print("[!] 输入无效,默认使用人气排列")
base_url = f"https://www.73dj.com/search.htm?keyword={encoded_str}&by=hits"
# ========== 第一步:获取总页数 + 第1页链接 ==========
first_page_url = f"{base_url}&page=1"
links, page_num_str = popularity_directory(first_page_url)
max_page = int(page_num_str)
print(f" 搜索结果共 {max_page} 页")
all_links = links
# ========== 第二步:翻页采集(统一拼接方式) ==========
for page in range(2, max_page + 1):
page_url = f"{base_url}&page={page}"
print(f"\n 正在获取第 {page} 页: {page_url}")
try:
page_links, _ = popularity_directory(page_url)
all_links.extend(page_links)
time.sleep(0.5)
except Exception as e:
print(f"[!] 第 {page} 页获取失败: {e}")
continue
# ========== 第三步:去重并下载 ==========
all_links = list(set(all_links))
print(f"\n[+] 共收集到 {len(all_links)} 首歌曲,开始下载...")
for link in all_links:
download_music(link)
time.sleep(0.3)
print(" 全部下载完成!")
if __name__ == '__main__':
main()
这个是输入关键词就可以搜索DJ歌曲了
03.既然可以实现了同步那异步呢,是不是也可以考虑呢
异步我的理解是,通一个事情同时干活
我这里写了两个,一个是协程,一个是多线程
区别是协程是通一个时间一起干,不断的切换
多线程的区别是一个进程里面开了线程不断的干
我设置最大为5
代码如下
这个是多线程
[Python] 纯文本查看 复制代码 import re
import os
import time
import requests
from lxml import etree
from concurrent.futures import ThreadPoolExecutor
# --- 全局常量与配置 ---
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
BASE_URL = "https://www.73dj.com"
AUDIO_BASE_URL = "https://p21.72djapp.cn/m4adj"
SAVE_DIR = "歌曲"
MAX_WORKERS = 5
# 【优化 3】预编译正则表达式,提升循环匹配速度
NAME_PATTERN = re.compile(r'var p_n="(.*?)"')
PATH_PATTERN = re.compile(r'var danceFilePath="(.*?)"')
# 【优化 2】创建一个全局的 Session 对象,复用底层的 TCP 连接
SESSION = requests.Session()
SESSION.headers.update(HEADERS)
def sanitize_filename(filename: str) -> str:
"""清理文件名中的非法字符"""
illegal_chars = r'[\\/:*?"<>|]'
safe_filename = re.sub(illegal_chars, '_', filename)
return safe_filename.strip()
def download_music(url: str):
"""解析详情页并流式下载歌曲"""
if not os.path.exists(SAVE_DIR):
try:
os.makedirs(SAVE_DIR)
except FileExistsError:
pass
try:
# 使用 SESSION 替代 requests
response = SESSION.get(url, timeout=10)
response.encoding = response.apparent_encoding
content = response.text
# 使用预编译的正则规则直接 search
name_match = NAME_PATTERN.search(content)
path_match = PATH_PATTERN.search(content)
if not name_match or not path_match:
print(f"[-] 页面解析失败: {url}")
return
name = sanitize_filename(name_match.group(1))
dance_file_path = path_match.group(1)
audio_url = f"{AUDIO_BASE_URL}/{dance_file_path}.m4a"
file_path = os.path.join(SAVE_DIR, f"{name}.m4a")
print(f" 准备下载: {name}")
# 【优化 1】开启 stream=True 进行流式下载,不撑爆内存
with SESSION.get(audio_url, stream=True, timeout=30) as audio_response:
# 检查 HTTP 状态码,如果不是 200 就抛出异常
audio_response.raise_for_status()
with open(file_path, "wb") as f:
# 每次读取 8KB 数据写入硬盘 (8192 bytes)
for chunk in audio_response.iter_content(chunk_size=8192):
if chunk: # 过滤掉保持连接的空 chunk
f.write(chunk)
print(f"[+] 保存成功至: {file_path}")
except Exception as e:
print(f"[!] 处理失败 ({name if 'name' in locals() else url}): {e}")
def directory_url():
"""获取排行榜并启动线程池"""
url = f"{BASE_URL}/top/good.htm"
# 这里也可以用 SESSION
response = SESSION.get(url)
response.encoding = response.apparent_encoding
html = etree.HTML(response.text)
li_list = html.xpath("//ul[@id='playlist']/li[position() <= 10]")
urls_to_download = []
for item in li_list:
href_list = item.xpath(".//a/@href")
if href_list:
urls_to_download.append(f"{BASE_URL}{href_list[0]}")
print(f" 共获取到 {len(urls_to_download)} 首歌曲,启动多线程下载...\n")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
executor.map(download_music, urls_to_download)
print("\n 所有下载任务已完成!")
if __name__ == '__main__':
time_start = time.time()
directory_url()
time_end = time.time()
print(f" 总耗时:{time_end - time_start:.2f} 秒")
热门一百首的多线程
3.1 热门一百首的协程
[Python] 纯文本查看 复制代码 import re
import os
import asyncio
from datetime import time
import time
import aiohttp
from lxml import etree
# --- 全局常量与配置 ---
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
BASE_URL = "https://www.73dj.com"
AUDIO_BASE_URL = "https://p21.72djapp.cn/m4adj"
SAVE_DIR = "歌曲" # 定义保存文件夹名称
def sanitize_filename(filename: str) -> str:
"""
清理文件名中的非法字符,防止在 Windows 系统下保存失败。
"""
illegal_chars = r'[\\/:*?"<>|]'
safe_filename = re.sub(illegal_chars, '_', filename)
return safe_filename.strip()
async def download_music(session: aiohttp.ClientSession, semaphore: asyncio.Semaphore, url: str):
"""
解析详情页并下载歌曲到指定文件夹 (协程版本)
"""
# 使用信号量控制并发数
async with semaphore:
try:
# 1. 异步获取页面内容
async with session.get(url, headers=HEADERS) as response:
content = await response.text(encoding='gbk', errors='ignore')
# 解析歌曲名和路径
name_match = re.search(r'var p_n="(.*?)"', content)
path_match = re.search(r'var danceFilePath="(.*?)"', content)
if not name_match or not path_match:
print(f"[-] 页面解析失败: {url}")
return
# 清洗非法字符
raw_name = name_match.group(1)
name = sanitize_filename(raw_name)
dance_file_path = path_match.group(1)
# 2. 拼接下载地址和保存路径
audio_url = f"{AUDIO_BASE_URL}/{dance_file_path}.m4a"
file_path = os.path.join(SAVE_DIR, f"{name}.m4a")
print(f" 正在获取并准备下载: {name}")
# 3. 异步下载音频文件
async with session.get(audio_url, headers=HEADERS) as audio_response:
if audio_response.status == 200:
# 使用流式分块写入,防止大文件吃满内存
with open(file_path, "wb") as f:
async for chunk in audio_response.content.iter_chunked(1024 * 64):
f.write(chunk)
print(f"[+] 保存成功至: {file_path}")
else:
print(f"[!] 下载失败: {name}, HTTP状态码: {audio_response.status}")
except Exception as e:
print(f"[!] 任务失败: {url}, 错误: {e}")
async def directory_url():
"""
获取排行榜并并发下载
"""
# 提前创建文件夹,避免并发时多次判断
if not os.path.exists(SAVE_DIR):
os.makedirs(SAVE_DIR)
print(f" 已创建文件夹: {SAVE_DIR}")
url = f"{BASE_URL}/top/good.htm"
# 定义信号量:同时最多 5 个协程执行下载
semaphore = asyncio.Semaphore(5)
# 开启一个共享的异步会话
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=HEADERS) as response:
html_content = await response.text(encoding='gbk', errors='ignore')
html = etree.HTML(html_content)
# 注意:这里改成了获取前 10 首,因为你想要测试同时下载 5 首的效果。
# 如果还是 position() <= 3,只有3个任务,看不出并发5个的效果。
li_list = html.xpath("//ul[@id='playlist']/li[position() <= 10]")
tasks = []
for item in li_list:
href_list = item.xpath(".//a/@href")
if href_list:
new_url = f"{BASE_URL}{href_list[0]}"
# 创建协程任务并加入列表
task = asyncio.create_task(download_music(session, semaphore, new_url))
tasks.append(task)
# 挂起当前协程,等待所有下载任务执行完毕
if tasks:
await asyncio.gather(*tasks)
print("\n[√] 所有下载任务已完成!")
if __name__ == '__main__':
time_start = time.time()
# 避免在 Windows 平台下运行 asyncio 时经常报错 `RuntimeError: Event loop is closed`
if os.name == 'nt':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
# 启动异步事件循环
asyncio.run(directory_url())
time_end = time.time()
print(f" 总耗时: {time_end - time_start:.2f} 秒")
到这里你以为就结束了嘛没有
我在想我都可以实现了为什么不做一个页面出来呢
所以我就写了一个界面
[Python] 纯文本查看 复制代码 import re
import os
import requests
from lxml import etree
import time
from urllib.parse import quote
import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext, filedialog # 新增 filedialog 导入
import threading
# ================= 配置与全局变量 =================
HEADERS = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/148.0.0.0 Safari/537.36 Edg/148.0.0.0',
'referer': 'https://www.73dj.com'
}
BASE_URL = "https://www.73dj.com"
AUDIO_BASE_URL = "https://p21.72djapp.cn/m4adj"
SESSION = requests.Session()
SESSION.headers.update(HEADERS)
# 主题颜色配置
COLOR_BG = "#FFF8F0"
COLOR_PRIMARY = "#FF8C00"
COLOR_SECONDARY = "#FFA500"
COLOR_TEXT = "#333333"
COLOR_WHITE = "#FFFFFF"
class MusicDownloaderApp:
def __init__(self, root):
self.root = root
self.root.title("小橙子音乐下载器 Pro - 分页版")
self.root.geometry("850x720") # 稍微调高了一点窗口以适应新增加的行
self.root.configure(bg=COLOR_BG)
# 状态变量
self.song_map = {}
self.is_searching = False
# 默认保存目录:当前运行目录下的 "歌曲" 文件夹
self.save_dir = os.path.abspath("歌曲")
if not os.path.exists(self.save_dir):
os.makedirs(self.save_dir)
# 分页状态记录
self.current_keyword = ""
self.current_sort = ""
self.current_page = 1
self.max_page = 1
self.setup_styles()
self.create_widgets()
self.log(f" 初始下载文件夹: {self.save_dir}")
def setup_styles(self):
style = ttk.Style()
style.theme_use('clam')
style.configure("TFrame", background=COLOR_BG)
style.configure("TLabelframe", background=COLOR_BG, foreground=COLOR_PRIMARY, font=("微软雅黑", 10, "bold"))
style.configure("TLabelframe.Label", background=COLOR_BG, foreground=COLOR_PRIMARY)
style.configure("TLabel", background=COLOR_BG, foreground=COLOR_TEXT, font=("微软雅黑", 10))
style.configure("TRadiobutton", background=COLOR_BG, foreground=COLOR_TEXT, font=("微软雅黑", 10))
style.configure("Primary.TButton", background=COLOR_PRIMARY, foreground=COLOR_WHITE,
font=("微软雅黑", 10, "bold"), padding=5)
style.map("Primary.TButton", background=[('active', COLOR_SECONDARY), ('disabled', '#D3D3D3')])
style.configure("Treeview", background=COLOR_WHITE, foreground=COLOR_TEXT, rowheight=25,
fieldbackground=COLOR_WHITE, font=("微软雅黑", 9))
style.map('Treeview', background=[('selected', '#FFE4B5')])
style.configure("Treeview.Heading", background=COLOR_SECONDARY, foreground=COLOR_WHITE,
font=("微软雅黑", 10, "bold"))
def create_widgets(self):
# ================= 顶部控制区 =================
control_frame = ttk.Frame(self.root, padding=15)
control_frame.pack(fill=tk.X)
# --- 第一行:搜索条件 ---
ttk.Label(control_frame, text="搜索关键词:").grid(row=0, column=0, padx=(0, 5), pady=5, sticky=tk.W)
self.keyword_entry = ttk.Entry(control_frame, width=30, font=("微软雅黑", 10))
self.keyword_entry.grid(row=0, column=1, padx=5, pady=5, sticky=tk.W)
self.keyword_entry.insert(0, "抖音")
ttk.Label(control_frame, text="排序方式:").grid(row=0, column=2, padx=(15, 5), pady=5, sticky=tk.W)
self.sort_var = tk.StringVar(value="hits")
ttk.Radiobutton(control_frame, text="人气排列", variable=self.sort_var, value="hits").grid(row=0, column=3,
padx=5)
ttk.Radiobutton(control_frame, text="默认排列", variable=self.sort_var, value="").grid(row=0, column=4, padx=5)
self.search_btn = ttk.Button(control_frame, text="🔍 开始搜索", style="Primary.TButton",
command=self.trigger_new_search)
self.search_btn.grid(row=0, column=5, padx=(20, 10), pady=5)
self.download_btn = ttk.Button(control_frame, text="⬇️ 下载选中", style="Primary.TButton",
command=self.start_download)
self.download_btn.grid(row=0, column=6, padx=5, pady=5)
# --- 第二行:下载目录选择 (新增) ---
ttk.Label(control_frame, text="保存目录:").grid(row=1, column=0, padx=(0, 5), pady=5, sticky=tk.W)
self.dir_var = tk.StringVar(value=self.save_dir)
self.dir_entry = ttk.Entry(control_frame, textvariable=self.dir_var, state='readonly', width=50,
font=("微软雅黑", 9))
self.dir_entry.grid(row=1, column=1, columnspan=4, padx=5, pady=5, sticky=tk.W)
self.select_dir_btn = ttk.Button(control_frame, text="📁 选择目录", style="Primary.TButton",
command=self.select_directory)
self.select_dir_btn.grid(row=1, column=5, padx=(20, 10), pady=5, sticky=tk.W)
# ================= 中间列表区 =================
list_frame = ttk.LabelFrame(self.root, text="搜索结果 (可多选)", padding=10)
list_frame.pack(fill=tk.BOTH, expand=True, padx=15, pady=5)
# 表格
tree_frame = ttk.Frame(list_frame)
tree_frame.pack(fill=tk.BOTH, expand=True)
columns = ("id", "name", "status")
self.tree = ttk.Treeview(tree_frame, columns=columns, show="headings", selectmode="extended")
self.tree.heading("id", text="序号")
self.tree.heading("name", text="歌曲名称")
self.tree.heading("status", text="状态")
self.tree.column("id", width=50, anchor=tk.CENTER)
self.tree.column("name", width=500, anchor=tk.W)
self.tree.column("status", width=150, anchor=tk.CENTER)
scrollbar = ttk.Scrollbar(tree_frame, orient=tk.VERTICAL, command=self.tree.yview)
self.tree.configure(yscroll=scrollbar.set)
self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# 分页控制栏
page_frame = ttk.Frame(list_frame)
page_frame.pack(fill=tk.X, pady=(10, 0))
self.prev_btn = ttk.Button(page_frame, text="◀ 上一页", style="Primary.TButton", command=self.on_prev_page,
state=tk.DISABLED)
self.prev_btn.pack(side=tk.LEFT, padx=10)
self.page_label = ttk.Label(page_frame, text="暂无数据", font=("微软雅黑", 10, "bold"),
foreground=COLOR_PRIMARY)
self.page_label.pack(side=tk.LEFT, expand=True)
self.next_btn = ttk.Button(page_frame, text="下一页 ▶", style="Primary.TButton", command=self.on_next_page,
state=tk.DISABLED)
self.next_btn.pack(side=tk.RIGHT, padx=10)
# ================= 底部日志区 =================
log_frame = ttk.LabelFrame(self.root, text="运行日志", padding=10)
log_frame.pack(fill=tk.X, padx=15, pady=(5, 15))
self.log_text = scrolledtext.ScrolledText(log_frame, height=6, font=("Consolas", 9), bg="#2B2B2B", fg="#A9B7C6")
self.log_text.pack(fill=tk.X, expand=True)
self.log_text.config(state=tk.DISABLED)
# ================= 工具方法 =================
def log(self, message):
self.root.after(0, self._append_log, message)
def _append_log(self, message):
self.log_text.config(state=tk.NORMAL)
time_str = time.strftime("[%H:%M:%S] ")
self.log_text.insert(tk.END, time_str + message + "\n")
self.log_text.see(tk.END)
self.log_text.config(state=tk.DISABLED)
def update_tree_status(self, item_id, status_text):
self.root.after(0, lambda: self.tree.set(item_id, column="status", value=status_text))
def sanitize_filename(self, filename: str) -> str:
illegal_chars = r'[\\/:*?"<>|]'
return re.sub(illegal_chars, '_', filename).strip()
# ================= 目录选择逻辑 (新增) =================
def select_directory(self):
"""打开文件夹选择对话框并更新保存路径"""
selected_dir = filedialog.askdirectory(title="选择音乐保存目录", initialdir=self.save_dir)
if selected_dir: # 如果用户选择了目录(而不是点击取消)
self.save_dir = os.path.abspath(selected_dir)
self.dir_var.set(self.save_dir)
self.log(f" 下载目录已更改为: {self.save_dir}")
# ================= 搜索与分页逻辑 =================
def trigger_new_search(self):
keyword = self.keyword_entry.get().strip()
if not keyword:
messagebox.showwarning("提示", "请输入搜索关键词!")
return
self.current_keyword = keyword
self.current_sort = self.sort_var.get()
self.current_page = 1
self.max_page = 1
self.load_page_data()
def on_prev_page(self):
if self.current_page > 1:
self.current_page -= 1
self.load_page_data()
def on_next_page(self):
if self.current_page < self.max_page:
self.current_page += 1
self.load_page_data()
def load_page_data(self):
if self.is_searching:
self.log("[!] 任务正在执行,请稍后...")
return
self.is_searching = True
self.search_btn.config(state=tk.DISABLED)
self.prev_btn.config(state=tk.DISABLED)
self.next_btn.config(state=tk.DISABLED)
for item in self.tree.get_children():
self.tree.delete(item)
self.song_map.clear()
threading.Thread(target=self.fetch_page_thread, daemon=True).start()
def fetch_page_thread(self):
try:
encoded_str = quote(self.current_keyword.encode('gbk'))
base_url = f"https://www.73dj.com/search.htm?keyword={encoded_str}&by={self.current_sort}"
page_url = f"{base_url}&page={self.current_page}"
sort_name = "人气排列" if self.current_sort == "hits" else "默认排列"
self.log(f" 获取 [{self.current_keyword}] 的第 {self.current_page} 页数据 ({sort_name})...")
response = SESSION.get(page_url, timeout=10)
response.encoding = response.apparent_encoding
html = etree.HTML(response.text)
try:
page_text_elem = html.xpath("//ul[@class='manu']/a[contains(text(), '页数')]/text()")[0]
self.max_page = int(re.search(r'\d+', page_text_elem).group())
except Exception:
pass
directory_list = html.xpath("//div[@class='wq_11s']/ul")
if not directory_list:
self.log("[-] 未找到任何结果或解析失败。")
else:
song_count = 0
for item in directory_list:
try:
a_tags = item.xpath(".//a")
if not a_tags: continue
a_tag = a_tags[0]
link = a_tag.attrib.get('href', '')
if not link: continue
complete_link = BASE_URL + link
all_texts = item.xpath(".//text()")
clean_texts = [t.strip() for t in all_texts if t.strip()]
song_name = "".join(clean_texts).strip()
if not song_name:
song_name = f"未知歌曲_{self.current_page}_{song_count}"
song_count += 1
display_id = (self.current_page - 1) * 25 + song_count
self.root.after(0, self._insert_to_tree, display_id, song_name, complete_link)
except Exception as e:
continue
self.log(f"[+] 第 {self.current_page} 页加载完成,共解析到 {song_count} 首歌曲。")
except Exception as e:
self.log(f"[!] 网络请求或解析发生异常: {e}")
finally:
self.is_searching = False
self.root.after(0, self._restore_ui_state)
def _insert_to_tree(self, count, name, url):
item_id = self.tree.insert("", tk.END, values=(count, name, "待下载"))
self.song_map[item_id] = url
def _restore_ui_state(self):
self.search_btn.config(state=tk.NORMAL)
self.page_label.config(text=f"第 {self.current_page} 页 / 共 {self.max_page} 页")
if self.current_page > 1:
self.prev_btn.config(state=tk.NORMAL)
else:
self.prev_btn.config(state=tk.DISABLED)
if self.current_page < self.max_page:
self.next_btn.config(state=tk.NORMAL)
else:
self.next_btn.config(state=tk.DISABLED)
# ================= 下载逻辑 =================
def start_download(self):
selected_items = self.tree.selection()
if not selected_items:
messagebox.showinfo("提示", "请先在列表中选中要下载的歌曲")
return
# 下载前确保当前设定的文件夹存在 (防误删)
if not os.path.exists(self.save_dir):
try:
os.makedirs(self.save_dir)
except Exception as e:
messagebox.showerror("错误", f"无法创建保存目录:\n{e}")
return
self.log(f" 准备下载 {len(selected_items)} 首歌曲...")
self.download_btn.config(state=tk.DISABLED)
threading.Thread(target=self.download_thread, args=(selected_items,), daemon=True).start()
def download_thread(self, selected_items):
for item_id in selected_items:
url = self.song_map.get(item_id)
if not url:
continue
song_name = self.tree.item(item_id, "values")[1]
self.update_tree_status(item_id, "解析中...")
try:
response = SESSION.get(url, timeout=10)
response.encoding = response.apparent_encoding
content = response.text
name_match = re.search(r'var p_n="(.*?)"', content)
path_match = re.search(r'var danceFilePath="(.*?)"', content)
if not name_match or not path_match:
self.log(f"[-] 解析失败跳过: {song_name}")
self.update_tree_status(item_id, "❌ 解析失败")
continue
safe_name = self.sanitize_filename(name_match.group(1))
dance_file_path = path_match.group(1)
audio_url = f"{AUDIO_BASE_URL}/{dance_file_path}.m4a"
# 修改:使用 self.save_dir 替代原先写死的全局 SAVE_DIR
file_path = os.path.join(self.save_dir, f"{safe_name}.m4a")
if os.path.exists(file_path):
self.log(f" 文件已存在,跳过: {safe_name}.m4a")
self.update_tree_status(item_id, "✅ 已存在")
continue
self.update_tree_status(item_id, "⬇️ 下载中...")
self.log(f" 正在下载: {safe_name}")
audio_response = SESSION.get(audio_url, timeout=15)
with open(file_path, "wb") as f:
f.write(audio_response.content)
self.log(f"[+] 保存成功: {safe_name}.m4a")
self.update_tree_status(item_id, "✅ 下载完成")
except Exception as e:
self.log(f"[!] 下载失败: {e}")
self.update_tree_status(item_id, "❌ 下载报错")
time.sleep(0.5)
self.log(" 选中的任务已处理完毕!")
self.root.after(0, lambda: self.download_btn.config(state=tk.NORMAL))
if __name__ == '__main__':
root = tk.Tk()
app = MusicDownloaderApp(root)
root.mainloop()
实现的是如图
实现功能
我已经实现了打包但是不知道咋发
欢迎提示
这样子嘛
DJ下载器.rar - 蓝奏云
https://wwbjp.lanzouw.com/iMcES3pf1hda |
免费评分
-
查看全部评分
|