吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 760|回复: 19
收起左侧

[Python 原创] 一款m3u8下载器

  [复制链接]
szsnk144864 发表于 2026-4-12 18:25
一款m3u8批量下载的软件,支持部分分分加密的m3u8文件,有些加密的应该是没办法搞定


当复制到链接的时候会自动粘贴进入,不需要自己手动点复制粘贴


其它信息如图,自己测试了还能用,不知道其他老铁们能不能正常用,有兴趣的可以源码查看修改.....


不喜勿喷.....


成品链接:https://szsnk144864.lanzoue.com/iHfFg3n2wzja
ScreenShot_2026-04-12_181156_317.png
[Python] 纯文本查看 复制代码
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import threading
import queue
import os
import time
import requests
import m3u8
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urljoin
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad

# -------------------------- 配置项 --------------------------
DEFAULT_THREAD_NUM = 8
TIMEOUT = 15
HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows 10; Win64; x64) AppleWebKit/537.36"
}
DEFAULT_PATH = os.path.join(os.path.expanduser("~"), "Desktop")


# ------------------------------------------------------------

class M3U8Downloader(tk.Tk):
    def __init__(self):
        super().__init__()
        self.title("M3U8批量下载器V0.1  By豪哥")
        self.geometry("570x450")
        self.resizable(False, False)

        # 全局变量
        self.save_path = tk.StringVar(value=DEFAULT_PATH)
        self.total_ts = 0
        self.finish_ts = 0
        self.msg_queue = queue.Queue()
        self.is_downloading = False
        self.key = None
        self.iv = None
        self.last_clipboard = ""
        self.thread_num = tk.IntVar(value=DEFAULT_THREAD_NUM)
        self.last_link = ""

        # 下载计时&速度统计
        self.start_time = 0
        self.total_downloaded_bytes = 0

        # 批量下载变量
        self.video_list = []
        self.current_video_idx = 0
        self.total_videos = 0
        self.remaining_videos = 0  # 剩余视频数量
        self.video_size = 0.00

        self.init_ui()
        self.check_queue()
        self.watch_clipboard()
        self.link_text.bind("<<Modified>>", self.on_link_changed)

    def init_ui(self):
        """初始化界面"""
        # ====================== 顶部按钮区域 ======================
        top_frame = tk.Frame(self, pady=5)
        top_frame.pack(fill=tk.X, padx=10)

        self.btn_path = tk.Button(
            top_frame, text="&#128194; 下载目录", command=self.select_save_path,
            width=12, font=("微软雅黑", 9)
        )
        self.btn_path.pack(side=tk.LEFT)

        self.label_path = tk.Label(
            top_frame, textvariable=self.save_path, fg="blue", font=("微软雅黑", 9)
        )
        self.label_path.pack(side=tk.LEFT, padx=5)

        self.btn_start = tk.Button(
            top_frame, text="&#9654;&#65039; 开始下载", command=self.start_batch_download,
            width=12, font=("微软雅黑", 9)
        )
        self.btn_start.pack(side=tk.RIGHT)

        # ====================== 进度条+百分比 ======================
        progress_frame = tk.Frame(self, pady=3)
        progress_frame.pack(fill=tk.X, padx=10)
        tk.Label(progress_frame, text="下载进度:", font=("微软雅黑", 9)).pack(side=tk.LEFT)

        self.progress_bar = ttk.Progressbar(progress_frame, mode="determinate", length=380)
        self.progress_bar.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5)

        self.progress_label = tk.Label(progress_frame, text="0%", font=("微软雅黑", 9))
        self.progress_label.pack(side=tk.LEFT)

        # ====================== M3U8链接列表 + 线程选择 ======================
        link_frame = tk.Frame(self, pady=5)
        link_frame.pack(fill=tk.X, padx=10)
        tk.Label(link_frame, text="M3U8链接列表(每行一个):", font=("微软雅黑", 9)).pack(anchor=tk.W)

        self.link_text = tk.Text(link_frame, height=4, font=("微软雅黑", 9))
        self.link_text.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5)
        self.link_text.edit_modified(False)

        # 右侧:线程数选择
        thread_frame = tk.Frame(link_frame)
        thread_frame.pack(side=tk.RIGHT, padx=5)

        tk.Label(thread_frame, text="下载线程:", font=("微软雅黑", 9)).pack(anchor=tk.N)
        self.thread_combobox = ttk.Combobox(
            thread_frame, textvariable=self.thread_num,
            values=[1, 2, 4, 8, 16, 32], state="readonly", width=5
        )
        self.thread_combobox.current(3)
        self.thread_combobox.pack(anchor=tk.N)

        self.btn_clear = tk.Button(
            thread_frame, text="清空", command=self.clear_link,
            width=6, font=("微软雅黑", 9)
        )
        self.btn_clear.pack(anchor=tk.N, pady=5)

        # ====================== 左右分栏布局 ======================
        bottom_frame = tk.Frame(self, pady=5)
        bottom_frame.pack(fill=tk.BOTH, expand=True, padx=10)

        # 左侧:视频统计信息
        self.stats_frame = tk.LabelFrame(bottom_frame, text="视频统计信息", font=("微软雅黑", 9), width=180)
        self.stats_frame.pack(side=tk.LEFT, fill=tk.BOTH, padx=(0, 5))
        self.stats_frame.pack_propagate(False)

        # ========== 【总视频 + 剩余视频】 ==========
        self.label_total_videos = tk.Label(self.stats_frame, text=f"总视频数:0", font=("微软雅黑", 8))
        self.label_total_videos.pack(anchor=tk.W, padx=5, pady=2)

        self.label_remaining_videos = tk.Label(self.stats_frame, text=f"剩余视频数:0", font=("微软雅黑", 8))
        self.label_remaining_videos.pack(anchor=tk.W, padx=5, pady=2)
        # ==========================================

        # 视频大小
        self.label_size = tk.Label(self.stats_frame, text=f"大小:{self.video_size:.2f} MB", font=("微软雅黑", 8))
        self.label_size.pack(anchor=tk.W, padx=5, pady=2)
        # TS总数/已下载
        self.label_ts_info = tk.Label(self.stats_frame, text=f"TS总数:{self.total_ts}/{self.finish_ts}", font=("微软雅黑", 8))
        self.label_ts_info.pack(anchor=tk.W, padx=5, pady=2)
        # 总下载速度
        self.label_speed = tk.Label(self.stats_frame, text="总速度:0.00 MB/s", font=("微软雅黑", 8))
        self.label_speed.pack(anchor=tk.W, padx=5, pady=2)
        # 已耗时
        self.label_cost = tk.Label(self.stats_frame, text="已耗时:00:00", font=("微软雅黑", 8))
        self.label_cost.pack(anchor=tk.W, padx=5, pady=2)
        # 预估剩余时间
        self.label_remain = tk.Label(self.stats_frame, text="剩余时间:--:--", font=("微软雅黑", 8))
        self.label_remain.pack(anchor=tk.W, padx=5, pady=2)

        # 右侧:运行日志
        log_frame = tk.LabelFrame(bottom_frame, text="运行日志", font=("微软雅黑", 9))
        log_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True)

        log_scroll = tk.Scrollbar(log_frame)
        log_scroll.pack(side=tk.RIGHT, fill=tk.Y)
        self.log_text = tk.Text(
            log_frame, height=5, font=("微软雅黑", 8),
            yscrollcommand=log_scroll.set, state=tk.DISABLED
        )
        self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
        log_scroll.config(command=self.log_text.yview)

    def time_format(self, seconds):
        """秒数转换为 分:秒 格式"""
        if seconds < 0:
            return "00:00"
        m = int(seconds // 60)
        s = int(seconds % 60)
        return f"{m:02d}:{s:02d}"

    def on_link_changed(self, event):
        """链接框内容变化 → 自动更新总视频数"""
        self.link_text.edit_modified(False)
        links = self.link_text.get(1.0, tk.END).strip().splitlines()
        valid_links = [l.strip() for l in links if l.strip().startswith("http")]

        # 自动统计视频数量
        self.total_videos = len(valid_links)
        self.remaining_videos = self.total_videos
        self.update_video_count_labels()

        if self.is_downloading:
            return

        first_link = next((l.strip() for l in valid_links), "")
        if first_link == self.last_link:
            return
        self.last_link = first_link
        if not first_link:
            self.reset_stats()
            return
        threading.Thread(target=self.pre_parse_m3u8_info, args=(first_link,), daemon=True).start()

    def update_video_count_labels(self):
        """更新总视频数 / 剩余视频数"""
        self.label_total_videos.config(text=f"总视频数:{self.total_videos}")
        self.label_remaining_videos.config(text=f"剩余视频数:{self.remaining_videos}")

    def pre_parse_m3u8_info(self, m3u8_url):
        """预解析M3U8信息"""
        try:
            m3u8_obj = m3u8.load(m3u8_url, headers=HEADERS, timeout=TIMEOUT)
            temp_size = 0.0
            if m3u8_obj.playlists:
                new_url = urljoin(m3u8_url, m3u8_obj.playlists[-1].uri)
                m3u8_obj = m3u8.load(new_url, headers=HEADERS, timeout=TIMEOUT)
            temp_total_ts = len(m3u8_obj.segments)
            for seg in m3u8_obj.segments:
                temp_size += seg.duration * 0.5
            self.video_size = temp_size
            self.total_ts = temp_total_ts
            self.finish_ts = 0
            self.update_stats()
        except Exception as e:
            self.reset_stats()

    def reset_stats(self):
        """重置统计信息"""
        self.video_size = 0.00
        self.total_ts = 0
        self.finish_ts = 0
        self.total_downloaded_bytes = 0
        self.start_time = 0
        self.update_stats()

    def watch_clipboard(self):
        """监听剪贴板,自动换行添加链接"""
        try:
            content = self.clipboard_get().strip()
            if content != self.last_clipboard and content.startswith("http") and ("m3u8" in content):
                self.last_clipboard = content
                self.link_text.insert(tk.END, content + "\n")
        except:
            pass
        self.after(1000, self.watch_clipboard)

    def select_save_path(self):
        path = filedialog.askdirectory(title="选择保存目录", initialdir=DEFAULT_PATH)
        if path:
            self.save_path.set(path)

    def clear_link(self):
        self.link_text.delete(1.0, tk.END)
        self.total_videos = 0
        self.remaining_videos = 0
        self.update_video_count_labels()
        self.reset_stats()

    def write_log(self, msg):
        self.msg_queue.put(("log", msg))

    def update_progress(self):
        self.msg_queue.put(("progress", None))

    def update_stats(self):
        self.msg_queue.put(("stats", None))

    def download_finish(self):
        self.msg_queue.put(("finish", None))

    def check_queue(self):
        """刷新GUI(线程安全)"""
        while not self.msg_queue.empty():
            task, data = self.msg_queue.get()
            if task == "log":
                self.log_text.config(state=tk.NORMAL)
                self.log_text.insert(tk.END, f"[{time.strftime('%H:%M:%S')}] {data}\n")
                self.log_text.config(state=tk.DISABLED)
                self.log_text.see(tk.END)
            elif task == "progress":
                percent = int(self.finish_ts / self.total_ts * 100) if self.total_ts > 0 else 0
                self.progress_bar["value"] = self.finish_ts
                self.progress_label.config(text=f"{percent}%")
                self.label_ts_info.config(text=f"TS总数:{self.total_ts}/{self.finish_ts}")
                if self.finish_ts > 0 and self.start_time > 0:
                    cost_time = time.time() - self.start_time
                    speed_mb = (self.total_downloaded_bytes / 1024 / 1024) / cost_time if cost_time > 0 else 0
                    remain_ts = self.total_ts - self.finish_ts
                    remain_time = (remain_ts * cost_time) / self.finish_ts if self.finish_ts > 0 else 0
                    self.label_speed.config(text=f"总速度:{speed_mb:.2f} MB/s")
                    self.label_cost.config(text=f"已耗时:{self.time_format(cost_time)}")
                    self.label_remain.config(text=f"剩余时间:{self.time_format(remain_time)}")
            elif task == "stats":
                self.label_size.config(text=f"大小:{self.video_size:.2f} MB")
                self.label_ts_info.config(text=f"TS总数:{self.total_ts}/{self.finish_ts}")
                self.label_speed.config(text="总速度:0.00 MB/s")
                self.label_cost.config(text="已耗时:00:00")
                self.label_remain.config(text="剩余时间:--:--")
            elif task == "finish":
                self.is_downloading = False
                self.btn_start.config(state=tk.NORMAL, text="&#9654;&#65039; 开始下载")
                self.progress_label.config(text="0%")
        self.after(50, self.check_queue)

    def get_decrypt_key(self, key_uri, base_url):
        try:
            key_url = urljoin(base_url, key_uri)
            res = requests.get(key_url, headers=HEADERS, timeout=TIMEOUT)
            res.raise_for_status()
            return res.content
        except:
            return None

    def decrypt_ts(self, ts_data):
        if self.key is None:
            return ts_data
        try:
            cipher = AES.new(self.key, AES.MODE_CBC, self.iv or self.key)
            return unpad(cipher.decrypt(ts_data), AES.block_size)
        except:
            return ts_data

    def download_ts(self, ts_url, save_dir, index):
        """下载单个TS分片"""
        ts_name = f"{index:05d}.ts"
        ts_path = os.path.join(save_dir, ts_name)
        try:
            with requests.get(ts_url, headers=HEADERS, timeout=TIMEOUT, stream=True) as r:
                r.raise_for_status()
                ts_data = self.decrypt_ts(r.content)
                self.total_downloaded_bytes += len(ts_data)
                with open(ts_path, "wb") as f:
                    f.write(ts_data)
            self.write_log(f"下载:{ts_name}")
            self.finish_ts += 1
            self.update_progress()
        except Exception as e:
            self.write_log(f"失败:{ts_name}")

    def merge_ts(self, ts_dir, save_path):
        video_name = time.strftime("%Y%m%d_%H%M%S") + ".mp4"
        video_path = os.path.join(save_path, video_name)
        ts_files = sorted([f for f in os.listdir(ts_dir) if f.endswith(".ts")])
        with open(video_path, "wb") as out:
            for ts in ts_files:
                with open(os.path.join(ts_dir, ts), "rb") as f:
                    out.write(f.read())
        for ts in ts_files:
            os.remove(os.path.join(ts_dir, ts))
        os.rmdir(ts_dir)
        self.write_log(f"&#9989; 下载完成!文件地址:{video_path}")

    def parse_m3u8(self, m3u8_url):
        m3u8_obj = m3u8.load(m3u8_url, headers=HEADERS, timeout=TIMEOUT)
        self.video_size = 0.0
        if m3u8_obj.playlists:
            new_url = urljoin(m3u8_url, m3u8_obj.playlists[-1].uri)
            return self.parse_m3u8(new_url)
        self.total_ts = len(m3u8_obj.segments)
        for seg in m3u8_obj.segments:
            self.video_size += seg.duration * 0.5
        self.key = None
        self.iv = None
        if m3u8_obj.keys and m3u8_obj.keys[0]:
            key_info = m3u8_obj.keys[0]
            self.key = self.get_decrypt_key(key_info.uri, m3u8_url)
            self.iv = bytes.fromhex(key_info.iv[2:]) if key_info.iv else None
        return m3u8_obj, m3u8_url.rsplit("/", 1)[0] + "/"

    def download_single_video(self, m3u8_url):
        """下载单个视频"""
        try:
            self.start_time = time.time()
            self.total_downloaded_bytes = 0
            self.write_log(f"&#127916; 开始下载第 {self.current_video_idx} 个视频")
            m3u8_obj, base_url = self.parse_m3u8(m3u8_url)
            self.update_stats()
            if not m3u8_obj.segments:
                self.write_log("&#10060; 未找到视频分片!")
                return
            self.finish_ts = 0
            self.progress_bar["maximum"] = self.total_ts
            self.progress_bar["value"] = 0
            self.write_log(f"解析完成!总TS数:{self.total_ts}")
            ts_dir = os.path.join(self.save_path.get(), "temp_ts")
            os.makedirs(ts_dir, exist_ok=True)
            thread_count = self.thread_num.get()
            self.write_log(f"&#128640; 启动 {thread_count} 线程下载...")
            with ThreadPoolExecutor(thread_count) as executor:
                for idx, url in enumerate([urljoin(base_url, seg.uri) for seg in m3u8_obj.segments]):
                    executor.submit(self.download_ts, url, ts_dir, idx)
            self.merge_ts(ts_dir, self.save_path.get())
        except Exception as e:
            self.write_log(f"&#128165; 下载异常:{str(e)}")

    def batch_download_worker(self):
        """批量下载主逻辑"""
        try:
            links = self.link_text.get(1.0, tk.END).strip().splitlines()
            self.video_list = [link.strip() for link in links if link.strip().startswith("http")]
            self.total_videos = len(self.video_list)
            self.remaining_videos = self.total_videos
            self.current_video_idx = 0
            self.update_video_count_labels()

            if self.total_videos == 0:
                messagebox.showerror("错误", "请输入有效的M3U8链接!")
                return

            self.write_log(f"&#128229; 共加载 {self.total_videos} 个视频,开始批量下载")

            for url in self.video_list:
                self.current_video_idx += 1
                # ========== 【核心修改:开始下载前就 -1】 ==========
                self.remaining_videos -= 1
                self.update_video_count_labels()
                # ================================================
                self.download_single_video(url)

            self.write_log("&#127881; 所有视频下载完成!")
            messagebox.showinfo("完成", "全部视频下载成功!")
            self.clear_link()

        except Exception as e:
            self.write_log(f"批量下载异常:{str(e)}")
        finally:
            self.download_finish()

    def start_batch_download(self):
        """启动批量下载"""
        if self.is_downloading:
            messagebox.showwarning("提示", "正在下载中!")
            return
        self.is_downloading = True
        self.btn_start.config(state=tk.DISABLED, text="&#9208; 下载中...")
        self.log_text.config(state=tk.NORMAL)
        self.log_text.delete(1.0, tk.END)
        self.log_text.config(state=tk.DISABLED)
        threading.Thread(target=self.batch_download_worker, daemon=True).start()


if __name__ == "__main__":
    app = M3U8Downloader()
    app.mainloop()

免费评分

参与人数 8吾爱币 +7 热心值 +7 收起 理由
lzlass + 1 + 1 谢谢@Thanks!
mmp211 + 1 + 1 用心讨论,共获提升!
huagen1015 + 1 + 1 用心讨论,共获提升!
7ray + 1 谢谢@Thanks!
psqladm + 1 + 1 热心回复!
yanglinman + 1 谢谢@Thanks!
wanfon + 1 + 1 热心回复!
sxp3468 + 1 + 1 谢谢@Thanks!

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

Lss666888 发表于 2026-4-12 18:47
感谢大佬分享 测试一下
whz20093177 发表于 2026-4-12 18:51
sxp3468 发表于 2026-4-12 18:59
Sandyang 发表于 2026-4-12 19:28
我还在用N_m3u8DL-CLI_v2.9.9_with_ffmpeg_and_SimpleG,不知道你这个能不能合并TS,,
zdwseed 发表于 2026-4-12 19:35
感谢楼主分享好软件,下载使用先
Theresae 发表于 2026-4-12 19:52
感谢楼主分享
alanlyf 发表于 2026-4-12 20:02
要带下载地址嗅探功能就更棒了,不过这也很不错了,支持一下
penz 发表于 2026-4-12 20:27
不错,不时会用到
刘洪1964 发表于 2026-4-12 21:01
谢谢分享,感谢
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - 52pojie.cn ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2026-4-13 05:54

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表