[Python] 纯文本查看 复制代码
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import threading
import queue
import os
import time
import requests
import m3u8
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urljoin
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
# -------------------------- 配置项 --------------------------
DEFAULT_THREAD_NUM = 8
TIMEOUT = 15
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows 10; Win64; x64) AppleWebKit/537.36"
}
DEFAULT_PATH = os.path.join(os.path.expanduser("~"), "Desktop")
# ------------------------------------------------------------
class M3U8Downloader(tk.Tk):
def __init__(self):
super().__init__()
self.title("M3U8批量下载器V0.1 By豪哥")
self.geometry("570x450")
self.resizable(False, False)
# 全局变量
self.save_path = tk.StringVar(value=DEFAULT_PATH)
self.total_ts = 0
self.finish_ts = 0
self.msg_queue = queue.Queue()
self.is_downloading = False
self.key = None
self.iv = None
self.last_clipboard = ""
self.thread_num = tk.IntVar(value=DEFAULT_THREAD_NUM)
self.last_link = ""
# 下载计时&速度统计
self.start_time = 0
self.total_downloaded_bytes = 0
# 批量下载变量
self.video_list = []
self.current_video_idx = 0
self.total_videos = 0
self.remaining_videos = 0 # 剩余视频数量
self.video_size = 0.00
self.init_ui()
self.check_queue()
self.watch_clipboard()
self.link_text.bind("<<Modified>>", self.on_link_changed)
def init_ui(self):
"""初始化界面"""
# ====================== 顶部按钮区域 ======================
top_frame = tk.Frame(self, pady=5)
top_frame.pack(fill=tk.X, padx=10)
self.btn_path = tk.Button(
top_frame, text="📂 下载目录", command=self.select_save_path,
width=12, font=("微软雅黑", 9)
)
self.btn_path.pack(side=tk.LEFT)
self.label_path = tk.Label(
top_frame, textvariable=self.save_path, fg="blue", font=("微软雅黑", 9)
)
self.label_path.pack(side=tk.LEFT, padx=5)
self.btn_start = tk.Button(
top_frame, text="▶️ 开始下载", command=self.start_batch_download,
width=12, font=("微软雅黑", 9)
)
self.btn_start.pack(side=tk.RIGHT)
# ====================== 进度条+百分比 ======================
progress_frame = tk.Frame(self, pady=3)
progress_frame.pack(fill=tk.X, padx=10)
tk.Label(progress_frame, text="下载进度:", font=("微软雅黑", 9)).pack(side=tk.LEFT)
self.progress_bar = ttk.Progressbar(progress_frame, mode="determinate", length=380)
self.progress_bar.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5)
self.progress_label = tk.Label(progress_frame, text="0%", font=("微软雅黑", 9))
self.progress_label.pack(side=tk.LEFT)
# ====================== M3U8链接列表 + 线程选择 ======================
link_frame = tk.Frame(self, pady=5)
link_frame.pack(fill=tk.X, padx=10)
tk.Label(link_frame, text="M3U8链接列表(每行一个):", font=("微软雅黑", 9)).pack(anchor=tk.W)
self.link_text = tk.Text(link_frame, height=4, font=("微软雅黑", 9))
self.link_text.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5)
self.link_text.edit_modified(False)
# 右侧:线程数选择
thread_frame = tk.Frame(link_frame)
thread_frame.pack(side=tk.RIGHT, padx=5)
tk.Label(thread_frame, text="下载线程:", font=("微软雅黑", 9)).pack(anchor=tk.N)
self.thread_combobox = ttk.Combobox(
thread_frame, textvariable=self.thread_num,
values=[1, 2, 4, 8, 16, 32], state="readonly", width=5
)
self.thread_combobox.current(3)
self.thread_combobox.pack(anchor=tk.N)
self.btn_clear = tk.Button(
thread_frame, text="清空", command=self.clear_link,
width=6, font=("微软雅黑", 9)
)
self.btn_clear.pack(anchor=tk.N, pady=5)
# ====================== 左右分栏布局 ======================
bottom_frame = tk.Frame(self, pady=5)
bottom_frame.pack(fill=tk.BOTH, expand=True, padx=10)
# 左侧:视频统计信息
self.stats_frame = tk.LabelFrame(bottom_frame, text="视频统计信息", font=("微软雅黑", 9), width=180)
self.stats_frame.pack(side=tk.LEFT, fill=tk.BOTH, padx=(0, 5))
self.stats_frame.pack_propagate(False)
# ========== 【总视频 + 剩余视频】 ==========
self.label_total_videos = tk.Label(self.stats_frame, text=f"总视频数:0", font=("微软雅黑", 8))
self.label_total_videos.pack(anchor=tk.W, padx=5, pady=2)
self.label_remaining_videos = tk.Label(self.stats_frame, text=f"剩余视频数:0", font=("微软雅黑", 8))
self.label_remaining_videos.pack(anchor=tk.W, padx=5, pady=2)
# ==========================================
# 视频大小
self.label_size = tk.Label(self.stats_frame, text=f"大小:{self.video_size:.2f} MB", font=("微软雅黑", 8))
self.label_size.pack(anchor=tk.W, padx=5, pady=2)
# TS总数/已下载
self.label_ts_info = tk.Label(self.stats_frame, text=f"TS总数:{self.total_ts}/{self.finish_ts}", font=("微软雅黑", 8))
self.label_ts_info.pack(anchor=tk.W, padx=5, pady=2)
# 总下载速度
self.label_speed = tk.Label(self.stats_frame, text="总速度:0.00 MB/s", font=("微软雅黑", 8))
self.label_speed.pack(anchor=tk.W, padx=5, pady=2)
# 已耗时
self.label_cost = tk.Label(self.stats_frame, text="已耗时:00:00", font=("微软雅黑", 8))
self.label_cost.pack(anchor=tk.W, padx=5, pady=2)
# 预估剩余时间
self.label_remain = tk.Label(self.stats_frame, text="剩余时间:--:--", font=("微软雅黑", 8))
self.label_remain.pack(anchor=tk.W, padx=5, pady=2)
# 右侧:运行日志
log_frame = tk.LabelFrame(bottom_frame, text="运行日志", font=("微软雅黑", 9))
log_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True)
log_scroll = tk.Scrollbar(log_frame)
log_scroll.pack(side=tk.RIGHT, fill=tk.Y)
self.log_text = tk.Text(
log_frame, height=5, font=("微软雅黑", 8),
yscrollcommand=log_scroll.set, state=tk.DISABLED
)
self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
log_scroll.config(command=self.log_text.yview)
def time_format(self, seconds):
"""秒数转换为 分:秒 格式"""
if seconds < 0:
return "00:00"
m = int(seconds // 60)
s = int(seconds % 60)
return f"{m:02d}:{s:02d}"
def on_link_changed(self, event):
"""链接框内容变化 → 自动更新总视频数"""
self.link_text.edit_modified(False)
links = self.link_text.get(1.0, tk.END).strip().splitlines()
valid_links = [l.strip() for l in links if l.strip().startswith("http")]
# 自动统计视频数量
self.total_videos = len(valid_links)
self.remaining_videos = self.total_videos
self.update_video_count_labels()
if self.is_downloading:
return
first_link = next((l.strip() for l in valid_links), "")
if first_link == self.last_link:
return
self.last_link = first_link
if not first_link:
self.reset_stats()
return
threading.Thread(target=self.pre_parse_m3u8_info, args=(first_link,), daemon=True).start()
def update_video_count_labels(self):
"""更新总视频数 / 剩余视频数"""
self.label_total_videos.config(text=f"总视频数:{self.total_videos}")
self.label_remaining_videos.config(text=f"剩余视频数:{self.remaining_videos}")
def pre_parse_m3u8_info(self, m3u8_url):
"""预解析M3U8信息"""
try:
m3u8_obj = m3u8.load(m3u8_url, headers=HEADERS, timeout=TIMEOUT)
temp_size = 0.0
if m3u8_obj.playlists:
new_url = urljoin(m3u8_url, m3u8_obj.playlists[-1].uri)
m3u8_obj = m3u8.load(new_url, headers=HEADERS, timeout=TIMEOUT)
temp_total_ts = len(m3u8_obj.segments)
for seg in m3u8_obj.segments:
temp_size += seg.duration * 0.5
self.video_size = temp_size
self.total_ts = temp_total_ts
self.finish_ts = 0
self.update_stats()
except Exception as e:
self.reset_stats()
def reset_stats(self):
"""重置统计信息"""
self.video_size = 0.00
self.total_ts = 0
self.finish_ts = 0
self.total_downloaded_bytes = 0
self.start_time = 0
self.update_stats()
def watch_clipboard(self):
"""监听剪贴板,自动换行添加链接"""
try:
content = self.clipboard_get().strip()
if content != self.last_clipboard and content.startswith("http") and ("m3u8" in content):
self.last_clipboard = content
self.link_text.insert(tk.END, content + "\n")
except:
pass
self.after(1000, self.watch_clipboard)
def select_save_path(self):
path = filedialog.askdirectory(title="选择保存目录", initialdir=DEFAULT_PATH)
if path:
self.save_path.set(path)
def clear_link(self):
self.link_text.delete(1.0, tk.END)
self.total_videos = 0
self.remaining_videos = 0
self.update_video_count_labels()
self.reset_stats()
def write_log(self, msg):
self.msg_queue.put(("log", msg))
def update_progress(self):
self.msg_queue.put(("progress", None))
def update_stats(self):
self.msg_queue.put(("stats", None))
def download_finish(self):
self.msg_queue.put(("finish", None))
def check_queue(self):
"""刷新GUI(线程安全)"""
while not self.msg_queue.empty():
task, data = self.msg_queue.get()
if task == "log":
self.log_text.config(state=tk.NORMAL)
self.log_text.insert(tk.END, f"[{time.strftime('%H:%M:%S')}] {data}\n")
self.log_text.config(state=tk.DISABLED)
self.log_text.see(tk.END)
elif task == "progress":
percent = int(self.finish_ts / self.total_ts * 100) if self.total_ts > 0 else 0
self.progress_bar["value"] = self.finish_ts
self.progress_label.config(text=f"{percent}%")
self.label_ts_info.config(text=f"TS总数:{self.total_ts}/{self.finish_ts}")
if self.finish_ts > 0 and self.start_time > 0:
cost_time = time.time() - self.start_time
speed_mb = (self.total_downloaded_bytes / 1024 / 1024) / cost_time if cost_time > 0 else 0
remain_ts = self.total_ts - self.finish_ts
remain_time = (remain_ts * cost_time) / self.finish_ts if self.finish_ts > 0 else 0
self.label_speed.config(text=f"总速度:{speed_mb:.2f} MB/s")
self.label_cost.config(text=f"已耗时:{self.time_format(cost_time)}")
self.label_remain.config(text=f"剩余时间:{self.time_format(remain_time)}")
elif task == "stats":
self.label_size.config(text=f"大小:{self.video_size:.2f} MB")
self.label_ts_info.config(text=f"TS总数:{self.total_ts}/{self.finish_ts}")
self.label_speed.config(text="总速度:0.00 MB/s")
self.label_cost.config(text="已耗时:00:00")
self.label_remain.config(text="剩余时间:--:--")
elif task == "finish":
self.is_downloading = False
self.btn_start.config(state=tk.NORMAL, text="▶️ 开始下载")
self.progress_label.config(text="0%")
self.after(50, self.check_queue)
def get_decrypt_key(self, key_uri, base_url):
try:
key_url = urljoin(base_url, key_uri)
res = requests.get(key_url, headers=HEADERS, timeout=TIMEOUT)
res.raise_for_status()
return res.content
except:
return None
def decrypt_ts(self, ts_data):
if self.key is None:
return ts_data
try:
cipher = AES.new(self.key, AES.MODE_CBC, self.iv or self.key)
return unpad(cipher.decrypt(ts_data), AES.block_size)
except:
return ts_data
def download_ts(self, ts_url, save_dir, index):
"""下载单个TS分片"""
ts_name = f"{index:05d}.ts"
ts_path = os.path.join(save_dir, ts_name)
try:
with requests.get(ts_url, headers=HEADERS, timeout=TIMEOUT, stream=True) as r:
r.raise_for_status()
ts_data = self.decrypt_ts(r.content)
self.total_downloaded_bytes += len(ts_data)
with open(ts_path, "wb") as f:
f.write(ts_data)
self.write_log(f"下载:{ts_name}")
self.finish_ts += 1
self.update_progress()
except Exception as e:
self.write_log(f"失败:{ts_name}")
def merge_ts(self, ts_dir, save_path):
video_name = time.strftime("%Y%m%d_%H%M%S") + ".mp4"
video_path = os.path.join(save_path, video_name)
ts_files = sorted([f for f in os.listdir(ts_dir) if f.endswith(".ts")])
with open(video_path, "wb") as out:
for ts in ts_files:
with open(os.path.join(ts_dir, ts), "rb") as f:
out.write(f.read())
for ts in ts_files:
os.remove(os.path.join(ts_dir, ts))
os.rmdir(ts_dir)
self.write_log(f"✅ 下载完成!文件地址:{video_path}")
def parse_m3u8(self, m3u8_url):
m3u8_obj = m3u8.load(m3u8_url, headers=HEADERS, timeout=TIMEOUT)
self.video_size = 0.0
if m3u8_obj.playlists:
new_url = urljoin(m3u8_url, m3u8_obj.playlists[-1].uri)
return self.parse_m3u8(new_url)
self.total_ts = len(m3u8_obj.segments)
for seg in m3u8_obj.segments:
self.video_size += seg.duration * 0.5
self.key = None
self.iv = None
if m3u8_obj.keys and m3u8_obj.keys[0]:
key_info = m3u8_obj.keys[0]
self.key = self.get_decrypt_key(key_info.uri, m3u8_url)
self.iv = bytes.fromhex(key_info.iv[2:]) if key_info.iv else None
return m3u8_obj, m3u8_url.rsplit("/", 1)[0] + "/"
def download_single_video(self, m3u8_url):
"""下载单个视频"""
try:
self.start_time = time.time()
self.total_downloaded_bytes = 0
self.write_log(f"🎬 开始下载第 {self.current_video_idx} 个视频")
m3u8_obj, base_url = self.parse_m3u8(m3u8_url)
self.update_stats()
if not m3u8_obj.segments:
self.write_log("❌ 未找到视频分片!")
return
self.finish_ts = 0
self.progress_bar["maximum"] = self.total_ts
self.progress_bar["value"] = 0
self.write_log(f"解析完成!总TS数:{self.total_ts}")
ts_dir = os.path.join(self.save_path.get(), "temp_ts")
os.makedirs(ts_dir, exist_ok=True)
thread_count = self.thread_num.get()
self.write_log(f"🚀 启动 {thread_count} 线程下载...")
with ThreadPoolExecutor(thread_count) as executor:
for idx, url in enumerate([urljoin(base_url, seg.uri) for seg in m3u8_obj.segments]):
executor.submit(self.download_ts, url, ts_dir, idx)
self.merge_ts(ts_dir, self.save_path.get())
except Exception as e:
self.write_log(f"💥 下载异常:{str(e)}")
def batch_download_worker(self):
"""批量下载主逻辑"""
try:
links = self.link_text.get(1.0, tk.END).strip().splitlines()
self.video_list = [link.strip() for link in links if link.strip().startswith("http")]
self.total_videos = len(self.video_list)
self.remaining_videos = self.total_videos
self.current_video_idx = 0
self.update_video_count_labels()
if self.total_videos == 0:
messagebox.showerror("错误", "请输入有效的M3U8链接!")
return
self.write_log(f"📥 共加载 {self.total_videos} 个视频,开始批量下载")
for url in self.video_list:
self.current_video_idx += 1
# ========== 【核心修改:开始下载前就 -1】 ==========
self.remaining_videos -= 1
self.update_video_count_labels()
# ================================================
self.download_single_video(url)
self.write_log("🎉 所有视频下载完成!")
messagebox.showinfo("完成", "全部视频下载成功!")
self.clear_link()
except Exception as e:
self.write_log(f"批量下载异常:{str(e)}")
finally:
self.download_finish()
def start_batch_download(self):
"""启动批量下载"""
if self.is_downloading:
messagebox.showwarning("提示", "正在下载中!")
return
self.is_downloading = True
self.btn_start.config(state=tk.DISABLED, text="⏸ 下载中...")
self.log_text.config(state=tk.NORMAL)
self.log_text.delete(1.0, tk.END)
self.log_text.config(state=tk.DISABLED)
threading.Thread(target=self.batch_download_worker, daemon=True).start()
if __name__ == "__main__":
app = M3U8Downloader()
app.mainloop()