[Python] 纯文本查看 复制代码
import requests
import re
from bs4 import BeautifulSoup
import time
import threading
# 定义全局变量
base_url = None
file_url = None
data_list = []
def get_headers(referer=None):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:130.0) Gecko/20100101 Firefox/130.0",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6"
}
if referer:
headers["Referer"] = referer
return headers
# 定义获取文件URL的函数
def get_file_url(url):
global base_url, file_url
try:
response = requests.get(url, headers=get_headers())
response.raise_for_status() # 检查请求是否成功
bs = BeautifulSoup(response.text, 'lxml')
title = bs.title.text
file_url_match = re.findall(r"url : '(.*?)',", str(bs))
file_url = base_url + file_url_match[0] if file_url_match else None
return bs, title, file_url
except requests.RequestException as e:
print(f"获取文件链接出错: {e}")
return None, None, None
# 定义获取变量的函数
def get_variables(response_text):
pattern = r"var\s+(\w+)\s*=\s*'([^']*)';"
return dict(re.findall(pattern, response_text))
# 定义下载文件的函数
def download_file(url, name):
try:
response = requests.get(url, headers=get_headers())
response.raise_for_status() # 检查请求是否成功
with open(name, 'wb') as f:
f.write(response.content)
print(f'{name} 下载完成')
except requests.RequestException as e:
print(f"下载文件出错: {e}")
# 定义处理数据的函数
def process_data(item):
name = item['name_all']
detail_url = f'{base_url}/{item["id"]}'
try:
response = requests.get(detail_url, headers=get_headers())
response.raise_for_status() # 检查请求是否成功
response_bs = BeautifulSoup(response.text, 'lxml')
iframe = response_bs.find('iframe', class_='n_downlink')
if iframe:
src_value = base_url + iframe['src']
response = requests.get(src_value, headers=get_headers())
response.raise_for_status()
response.encoding = 'utf-8'
response_text = response.text.replace('=1;', "= '1';")
url_match = re.findall(r"url : '(.*?)',", response_text)
if url_match:
url = base_url + url_match[0]
variables_update = get_variables(response_text)
globals().update(variables_update) # 更新全局变量
data_match = re.findall(r"data : ({.*?}),", response_text)
if data_match:
data = eval(data_match[0].replace(' ', ''))
response = requests.post(url, headers=get_headers(src_value), data=data)
response.raise_for_status()
response_json = response.json()
print(response_json)
download_url = response_json['dom'] + '/file/' + response_json['url']
download_file(download_url, name)
except requests.RequestException as e:
print(f"处理 {name} 时出错: {e}")
# 定义获取数据的函数
def get_data():
global data_list
for pgs in range(1, 10):
data_match = re.findall(r"data : ({.*?}),", str(bs).replace('\n', ''))
if data_match:
data = eval(data_match[0].replace('\t', '').replace(' ', ''))
time.sleep(1)
response = requests.post(file_url, headers=get_headers(file_url), data=data)
response.raise_for_status()
response_json = response.json()
if response_json['info'] == '没有了':
break
data_list.extend(response_json['text'])
# 定义下载线程
class DownloadThread(threading.Thread):
def __init__(self, item):
threading.Thread.__init__(self)
self.item = item
def run(self):
process_data(self.item)
# 主程序
if __name__ == "__main__":
url = 'https://www.lanzov.com/b00l10ptud'
base_url = re.search(r"(https?://[^/]+)", url).group(1)
bs, title, file_url = get_file_url(url)
if bs is not None:
variables = get_variables(str(bs.findAll('script')[2].string.split('function')[0]))
globals().update(variables) # 使用 update 更新全局变量
get_data() # 获取数据
# 创建并启动下载线程
threads = []
for item in data_list:
thread = DownloadThread(item)
thread.start()
threads.append(thread)
# 等待所有线程完成
for thread in threads:
thread.join()
print('所有文件下载完成')