[Python] 纯文本查看 复制代码
# -*- coding:utf-8 -*-
# @ FileName :52pojieUI_douyin.py
# @ Author :neteast@52pojie
import os
import re
import subprocess
import threading
import tkinter as tk
import tkinter.font as tkFont
import warnings
import requests
from openpyxl import Workbook
from tkinter import *
import tkinter.ttk as ttk
LOG_LINE_NUM = 0
class App:
def __init__(self, root):
self.test = True
self.initUi(root)
self.initData()
self.wb = Workbook()
self.ws = self.wb.create_sheet(index=0, title='sheet1')
self.ws.append(['序号','标题','点赞数','评论数','临时下载链接'])
def initData(self):
self.tvList_var = Variable()
self.path = None
self.allaweme = []
self.nickname = '未找到该ID用户或者暂未发布作品'
self.status_download = True
self.tag = 'odd'
self.session = requests.Session()
self.session.headers.update({
'User-Agent': "Mozilla/5.0 (iPhone; U; CPU like Mac OS X; en) AppleWebKit/420+ (KHTML, like Gecko) Version/3.0 Mobile/1C28 Safari/419.3"})
self.starurl = None
self.baseurl = 'https://v.douyin.com/'
self.baseinfo = 'https://www.iesdouyin.com/web/api/v2/user/info/?'
self.userinfourl = None
def initUi(self, root):
root.title("DYDownloader neteast@52pojie 2022/10/12")
width = 897
height = 533
screenwidth = root.winfo_screenwidth()
screenheight = root.winfo_screenheight()
alignstr = '%dx%d+%d+%d' % (width, height, (screenwidth - width) / 2, (screenheight - height) / 2)
root.geometry(alignstr)
root.resizable(width=False, height=False)
ft = tkFont.Font(family='宋体', size=10)
GLabel_515 = tk.Label(root)
GLabel_515["font"] = ft
GLabel_515["justify"] = "center"
GLabel_515["text"] = "作者ID"
GLabel_515.place(x=20, y=10, width=47, height=30)
self.GLineEdit_508 = tk.Entry(root)
self.GLineEdit_508["borderwidth"] = "1px"
self.GLineEdit_508["justify"] = "center"
self.GLineEdit_508["text"] = "path"
self.GLineEdit_508['state'] = 'readonly'
self.GLineEdit_508.place(x=90, y=50, width=610, height=30)
self.GLineEdit_332 = tk.Entry(root)
self.GLineEdit_332["borderwidth"] = "1px"
self.GLineEdit_332["justify"] = "center"
self.GLineEdit_332["text"] = "url"
self.GLineEdit_332.place(x=90,y=10,width=128,height=30)
self.GButton_333 = tk.Button(root)
self.GButton_333["justify"] = "center"
self.GButton_333["text"] = "下载全部"
self.GButton_333.place(x=810, y=50, width=74, height=30)
self.GButton_333['state'] = 'disable'
self.GButton_333["command"] = self.GButton_333_command
ft2 = tkFont.Font(family='宋体', size=11)
self.GLineEdit_428 = tk.Text(root)
self.GLineEdit_428["borderwidth"] = "1px"
self.GLineEdit_428["font"] = ft2
self.GLineEdit_428.place(x=530,y=90,width=362,height=438)
self.GLineEdit_515=tk.Entry(root)
self.GLineEdit_515["borderwidth"] = "1px"
ft = tkFont.Font(family='Times',size=10)
self.GLineEdit_515["font"] = ft
self.GLineEdit_515["fg"] = "#333333"
self.GLineEdit_515["justify"] = "center"
self.GLineEdit_515["text"] = "tvlist"
self.GLineEdit_515.place(x=10,y=90,width=511,height=442)
yscroll = Scrollbar(self.GLineEdit_515, orient=VERTICAL)
yscroll.pack(side=RIGHT, fill=Y)
titles = ('序号', '作品名-(按住ctrl多选,按住shift连选)', '点赞数','评论数','下载链接')
self.tvList =ttk.Treeview(self.GLineEdit_515, columns=titles, style='Treeview', show='headings', height=21,
yscrollcommand=yscroll.set)
self.tvList.pack()
for i in range(len(titles)):
self.tvList.heading(column=titles[i], text=titles[i], anchor=CENTER) # 定义表头
self.tvList.column(titles[i], minwidth=20, anchor=CENTER ,stretch=True) # 定义列
self.tvList.column(titles[0], width=40, anchor=CENTER)
self.tvList.column(titles[1], width=300, anchor='w')
self.tvList.column(titles[2], width=75, anchor=CENTER)
self.tvList.column(titles[3], width=75, anchor=CENTER)
yscroll.config(command=self.tvList.yview)
self.tvList.bind("<ButtonRelease-1>", self.get_cursor)
self.GButton_676 = tk.Button(root)
self.GButton_676["font"] = ft
self.GButton_676["justify"] = "center"
self.GButton_676["text"] = "停止获取"
self.GButton_676['state'] = 'disable'
self.GButton_676.place(x=760,y=10,width=131,height=30)
self.GButton_676["command"] = self.GButton_676_command
self.GButton_701 = tk.Button(root)
self.GButton_701["font"] = ft
self.GButton_701["justify"] = "center"
self.GButton_701["text"] = "获取信息"
self.GButton_701.place(x=230,y=10,width=70,height=30)
self.GButton_701["command"] = self.GButton_701_command
GLabel_100 = tk.Label(root)
GLabel_100["font"] = ft
GLabel_100["justify"] = "center"
GLabel_100["text"] = "昵称"
GLabel_100.place(x=300,y=10,width=43,height=30)
GLabel_1 = tk.Label(root)
GLabel_1["font"] = ft
GLabel_1["justify"] = "center"
GLabel_1["text"] = "条作品"
GLabel_1.place(x=690,y=10,width=76,height=30)
self.GLineEdit_690 = tk.Entry(root)
self.GLineEdit_690["borderwidth"] = "1px"
self.GLineEdit_690["font"] = ft
self.GLineEdit_690["justify"] = "center"
self.GLineEdit_690["text"] = "条作品"
self.GLineEdit_690['state'] = 'readonly'
self.GLineEdit_690.place(x=590,y=10,width=90,height=30)
self.GLineEdit_281 = tk.Entry(root)
self.GLineEdit_281["borderwidth"] = "1px"
self.GLineEdit_281["font"] = ft
self.GLineEdit_281["fg"] = "#333333"
self.GLineEdit_281["justify"] = "center"
self.GLineEdit_281["text"] = "昵称"
self.GLineEdit_281['state'] = 'readonly'
self.GLineEdit_281.place(x=340,y=10,width=240,height=31)
GButton_55 = tk.Button(root)
GButton_55["bg"] = "#efefef"
GButton_55["font"] = ft
GButton_55["fg"] = "#000000"
GButton_55["justify"] = "center"
GButton_55["text"] = "保存路径"
GButton_55["relief"] = "groove"
GButton_55.place(x=10, y=50, width=70, height=30)
GButton_55["command"] = self.GButton_55_command
self.GButton_40=tk.Button(root)
self.GButton_40["bg"] = "#f0f0f0"
ft = tkFont.Font(family='Times',size=10)
self.GButton_40["font"] = ft
self.GButton_40["fg"] = "#000000"
self.GButton_40["justify"] = "center"
self.GButton_40["text"] = "下载选定"
self.GButton_40.place(x=720,y=50,width=73,height=30)
self.GButton_40["command"] = self.GButton_40_command
def get_cursor(self, ev):
self.tvList_var.set([ev.widget.item(idx)['values'] for idx in ev.widget.selection()])
def GButton_40_command(self):
obj1 = threading.Thread(target=self.download_video, args=({True}))
obj1.setDaemon(True)
obj1.start()
def GButton_55_command(self): # 打开文件夹
path = self.GLineEdit_508.get()
if path:
self.open_fp(path)
def GButton_701_command(self): # 获取信息
self.GButton_676['state'] = 'active'
authorId = self.GLineEdit_332.get()
for item in self.tvList.get_children():
self.tvList.delete(item)
self.GLineEdit_428.delete(0.0,'end')
global LOG_LINE_NUM
LOG_LINE_NUM = 0
self.status_download = True
if authorId:
self.starurl = self.baseurl + authorId
self._log(f'{"-" * 5}开始解析,请稍等{"-" * 5}')
obj1 = threading.Thread(target=self.analysis, args=({False}))
obj1.setDaemon(True)
obj1.start()
else:
self._log("请输入作者ID")
def GButton_676_command(self): # 停止获取
self.status_download = not self.status_download
self.GButton_676['state'] = 'disable'
def GButton_333_command(self): # 开始下载
obj1 = threading.Thread(target=self.download_video, args=({False}))
obj1.setDaemon(True)
obj1.start()
def analysis(self, flag):
req = self._requests('get', self.starurl, decode_level=3)
sp = req.url.split('?')
param =''
if 'share/user' in req.url and len(sp) == 2:
param = sp[1]
else:
self._log(f'{"-" * 5}获取数据失败,请检查主播ID是否正确{"-" * 5}')
return
self.userinfourl = self.baseinfo + param
userinfo = self._requests('get', self.userinfourl, decode_level=2)
self.nickname = userinfo['user_info']['nickname']
sen_text = re.compile(u'[\u4E00-\u9FA5|\s\w]').findall(self.nickname)
self.nickname = "".join(sen_text)
if len(self.nickname) < 1:
self.nickname = '名字全为符号'
aweme_count = userinfo['user_info']['aweme_count']
if not flag:
self._log(f'{"-" * 5}找到作者,开始解析作品!{"-" * 5}')
filepath = os.getcwd() + '\\' + 'dydownloads' + '\\' + self.nickname
self.GLineEdit_690['state'] = 'normal'
self.GLineEdit_281['state'] = 'normal'
self.GLineEdit_508['state'] = 'normal'
self.GLineEdit_508.delete(0, 'end')
self.GLineEdit_281.delete(0, 'end')
self.GLineEdit_690.delete(0, 'end')
self.GLineEdit_690.insert(0, f'{aweme_count}')
self.GLineEdit_281.insert(0, f'{self.nickname}')
self.GLineEdit_508.insert(0, f'{filepath}')
self.GLineEdit_690['state'] = 'readonly'
self.GLineEdit_281['state'] = 'readonly'
self.GLineEdit_508['state'] = 'readonly'
self.GButton_333['state'] = 'active'
file_name = self.GLineEdit_281.get() + '.xlsx'
max_cursor = 0
video_has_more = True
icount = 0;
while video_has_more and self.status_download:
json_url = f'https://www.iesdouyin.com/web/api/v2/aweme/post/?{param}&' \
f'count=21&max_cursor={max_cursor}'
req = self._requests('get', json_url, decode_level=2)
video_has_more = req['has_more']
max_cursor = req['max_cursor']
video_list = req['aweme_list']
for video in video_list:
if not self.status_download:
self._log(f'{"-" * 5}已停止解析video!{"-" * 5}')
break
icount += 1
self.analysis_video(video, icount)
self._log(f'{"-" * 5}全部{aweme_count}个视频已解析完成{icount}个!{"-" * 2}')
if aweme_count > icount and self.status_download:
self._log(f'作者有隐藏作品{aweme_count - icount}个无法列表')
self.path = os.getcwd() + '/' + 'dydownloads' + '/' + self.nickname
if not os.path.exists(self.path):
os.makedirs(self.path)
self.wb.save(self.path+'/'+file_name)
def analysis_video(self, video, num=0):
try:
filepath = os.getcwd() + '/' + 'dydownloads' + '/' + self.nickname
video_desc = video['desc'].replace('\n',' ')
likeCount = video['statistics']['digg_count']
comment_count = video['statistics']['comment_count']
download_url = f'https://aweme.snssdk.com/aweme/v1/play/?video_id={video["video"]["vid"]}&ratio=1080p'
self.ws.append([num,video_desc,likeCount,comment_count,download_url])
self.tvList.insert('','end',values=(num,video_desc,likeCount,comment_count,download_url))
self._log(f'[{num:0>3d}]{video_desc} {comment_count}评论 {likeCount}人点赞')
self.allaweme.append((num,video_desc,likeCount,comment_count,download_url))
except Exception as e:
self._log(f'获取数据失败,请检查主播ID是否正确,也可能cookies已过期!')
self._log(f'{e}')
def download_video(self,flag):
vediolist =[]
if flag :
vediolist = self.tvList_var.get()
else:
vediolist = self.allaweme
if len(vediolist) <1 :
return
self._log("-----------开始下载----------")
for vedio in vediolist:
filename = vedio[1]
if len(filename) < 1:
filename = '无标题'
# filename = filename.replace('\\','').replace('/','').replace(':','').replace('*','').replace('<','')\
# .replace('>','').replace('|','').replace('\"','').replace('?','')
sen_text = re.compile(u'[\u4E00-\u9FA5|\s\w]').findall(filename)
filename = "".join(sen_text)
if len(filename) > 40:
filename = filename[0:40]
self._log(f'[开始下载]{vedio[0]}{filename}')
video_data = self._requests('get', vedio[4], decode_level=3).content
self.save_video(self.path,
f'{vedio[0]:0>3d}_' + filename + '_' + '.mp4', video_data )
self._log("-----------下载结束----------")
def save_video(self, path, filename, video_data):
if not os.path.exists(path):
os.makedirs(path)
with open(os.path.normpath(os.path.join(path, filename)), 'wb') as f:
f.write(video_data)
self._log(f' --------[下载完成]--------')
def open_fp(self, fp):
import platform
systemType: str = platform.platform() # 获取系统类型
if 'mac' in systemType: # 判断以下当前系统类型
fp: str = fp.replace("\\", "/") # mac系统下,遇到`\\`让路径打不开,不清楚为什么哈,觉得没必要的话自己可以删掉啦,18行那条也是
subprocess.call(["open", fp])
else:
fp: str = fp.replace("/", "\\") # win系统下,有时`/`让路径打不开
try:
os.startfile(fp)
except:
self._log(f'{"-" * 20}文件还未下载{"-" * 20}')
def _requests(self, method, url, decode_level=1, retry=0, timeout=15, **kwargs):
if method in ["get", "post"]:
for _ in range(retry + 1):
try:
warnings.filterwarnings('ignore')
response = getattr(self.session, method)(url, timeout=timeout, verify=False, **kwargs)
return response.text if decode_level == 1 else response.json() if decode_level == 2 else response
except Exception as e:
self._log(e)
return None
def _log(self, logmsg):
logmsg = logmsg[:25] if len(logmsg) > 25 else logmsg
global LOG_LINE_NUM
logmsg_in = str(logmsg) + "\n" # 换行
self.GLineEdit_428.tag_config("even", background='#e0e0e0')
self.GLineEdit_428.tag_config("odd", background='#ffffff')
self.tag = 'odd' if self.tag == 'even' else 'even'
if LOG_LINE_NUM <= 27:
self.GLineEdit_428.insert('end', logmsg_in, self.tag)
LOG_LINE_NUM = LOG_LINE_NUM + 1
else:
self.GLineEdit_428.delete(1.0, 2.0)
self.GLineEdit_428.insert('end', logmsg_in, self.tag)
if __name__ == "__main__":
root = tk.Tk()
app = App(root)
root.mainloop()