[Python] 纯文本查看 复制代码
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import argparse
import datetime
import math
import os
import re
import time
import urllib.parse
from typing import List, Dict, Tuple
import requests
class Proxy(object):
"""
https://www.dailiservers.com/free-proxy-list/
https://proxyscrape.com/free-proxy-list-f
"""
DEF_PROXY_FILE_NAME = "audiobook.proxy"
def __init__(self, file_proxy: str):
self.file_proxy = file_proxy
if not self.file_proxy:
self.file_proxy = os.path.join(os.path.dirname(__file__), self.DEF_PROXY_FILE_NAME)
self.data_work = self.__load()
self.data_sleep = []
self.data_dead = []
@staticmethod
def __ip_with_port(proxy) -> bool:
return True if re.match("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$", proxy) else False
def __load(self) -> List[str]:
"""
始终将空字符串作为第一个值,表示不使用代{过}{滤}理
"""
if not os.path.isfile(self.file_proxy):
return [""]
data = set()
with open(self.file_proxy, "r", encoding="utf-8") as f:
for line in f.readlines():
line = line.strip()
if self.__ip_with_port(line):
data.add(line)
return [""] + list(data)
def get(self) -> str:
return self.data_work[0] if self.data_work else ""
def count(self) -> int:
return len(self.data_work)
def empty(self) -> bool:
return self.count() == 0
def feedback_dead(self, proxy: str):
if proxy in self.data_work:
self.data_work.remove(proxy)
self.data_dead.append(proxy)
self.save()
def feedback_sleep(self, proxy: str):
if proxy in self.data_work:
self.data_work.remove(proxy)
# self.data_work.append(proxy)
self.data_sleep.append(proxy)
self.save()
def save(self):
with open(self.file_proxy, "w", encoding="utf-8") as f:
f.write("\n".join([p for p in self.data_work if self.__ip_with_port(p)]))
f.write("\n".join([p for p in self.data_sleep if self.__ip_with_port(p)]))
class Ting55Thief(object):
"""
恋听网 https://ting55.com/
访问限制规则:
1. 同一 IP 短时间只能请求6次 nlinka 接口
"""
URL_NOVEL = "https://ting55.com/book/%s" # GET
URL_CHAPTER = "https://ting55.com/book/%s-%d" # GET
URL_API_RES = "https://ting55.com/nlinka" # POST
USER_AGENT = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
" AppleWebKit/537.36 (KHTML, like Gecko)"
" Chrome/116.0.0.0 Safari/537.36 Edg/116.0.1938.76")
NLINKA_INTERVAL = 6 # nlinka 请求间隔时长
FILE_SIZE_VALID = 100 * 1024 # 有效文件大小校验
def __init__(self, novel_id: str, novel_name: str, voice_actor: str, folder_out: str, file_proxy: str):
self.novel_id = novel_id # 小说ID
self.novel_name = novel_name # 小说名
self.voice_actor = voice_actor # 播音演员
self.folder_out = folder_out # 音频资源输出文件夹
self.chapter_max = 1 # 最大章节,用于固定宽度命名
self.log_nlinka = [0.0] # nlinka 请求时间日志
self.proxy = Proxy(file_proxy) # 代{过}{滤}理,用于 nlinka 接口
self.session = requests.session()
self.session.headers.update({
"User-Agent": self.USER_AGENT, # 必填
})
self.fix_novel_info() # 修正小说名、播音演员、输出文件夹、最大章节
def __res_url(self, url_page: str, chapter_id: int, token: str, proxy="") -> Tuple[str, int, str]:
"""
响应示例:{'ourl': '', 'plink': '', 'url': '', 'status': 1}
不使用代{过}{滤}理:proxies={"http": "", "https": ""}
返回值:-2 - 错误,-1 - 该资源失效,0 - 失败,1 - 成功且还剩余请求次数
"""
t = time.time()
try:
res = self.session.post(self.URL_API_RES, data={
"bookId": self.novel_id, # 必填
"page": chapter_id # 必填
# "isPay": 0 # 选填
}, headers={
"Referer": url_page, # 必填
"xt": token, # 必填
"l": "1" # 必填
}, proxies={
"http": proxy,
"https": proxy
}, timeout=8)
self.log_nlinka.append(t) # 无论是否获取到 URL,都计1次
if res.status_code != 200:
if res.status_code == 503:
return "", 0, "503: too fast"
return "", 0, "status code %d" % res.status_code
data = res.json()
if data["status"] != 1:
if data["status"] == -1:
return "", -1, "non-free content"
return "", -1, "unexpected response: %s" % data
if not data["url"]: # 达到请求限制
return "", 0, "too many requests"
return data["url"], 1, ""
except requests.exceptions.ReadTimeout:
return "", -2, "timeout"
except requests.exceptions.ProxyError:
return "", -2, "useless proxy"
except requests.exceptions.ConnectionError:
return "", -2, "connection error"
def fix_novel_info(self):
res = self.session.get(self.URL_NOVEL % self.novel_id)
text = res.text
novel_name, voice_actor, chapter_max = "", "", 1
match = re.search("<p>播音:<a[^>]+class=\"by\"[^>]*>(.+?)</a>", text)
if match:
voice_actor = match.group(1).strip()
match = re.search("class=\"binfo\"><h1>(.+?)</h1>", text)
if match:
novel_name = match.group(1).strip()
if voice_actor:
novel_name = re.sub("有声小说$", "", novel_name)
novel_name = re.sub("[((]子慕[))]", "", novel_name)
novel_name = novel_name.strip()
match = re.search(">(\\d+)</a></li></ul>", text)
if match:
chapter_max = int(match.group(1))
if not self.novel_name: # 修正小说名
self.novel_name = novel_name if novel_name else "未知小说"
if not self.voice_actor: # 修正播音演员
self.voice_actor = voice_actor if voice_actor else "未知播音"
if not self.folder_out: # 修正输出文件夹
self.folder_out = os.path.join(os.path.dirname(__file__), self.novel_name)
self.chapter_max = chapter_max # 修正最大章节
def chapter_url(self, chapter_id: int) -> str:
return self.URL_CHAPTER % (self.novel_id, chapter_id)
def file_name(self, chapter_id: int, chapter_name: str, fmt: str) -> str:
"""
格式:{小说名}-{播音演员}-{章节索引}-{章节名}.{扩展名}
"""
pattern = "%%s-%%s-%%0%dd-%%s%%s" % len(str(self.chapter_max))
name = pattern % (self.novel_name, self.voice_actor, chapter_id, chapter_name, fmt)
name = re.sub("[/\\\\:*?\"<>|]+", "_", name) # 替换不可用字符
return name
def get(self, chapter_id: int) -> Tuple[str, str]:
"""
Args:
chapter_id:
Returns:
音频资源直链、章节名
"""
url_page = self.chapter_url(chapter_id)
res = self.session.get(url_page)
text = res.text
match = re.search("name=\"_c\" content=\"(.+?)\"", text)
if not match:
print("token not found")
return "", ""
token = match.group(1)
while True:
proxy = self.proxy.get()
print("try proxy %s" % (proxy if proxy else "null"))
url_audio, status, msg = self.__res_url(url_page, chapter_id, token, proxy)
if status == 1:
break
if status == -1: # 非免费资源或资源失效
print(msg)
break
if status == 0: # 代{过}{滤}理有效但数据异常
print(msg)
self.proxy.feedback_sleep(proxy)
else: # -2 无效代{过}{滤}理
print(msg)
self.proxy.feedback_dead(proxy)
if self.proxy.empty(): # 代{过}{滤}理耗尽
break
if not url_audio:
print("error in parsing chapter url")
return "", ""
match = re.search(">\\s*第(\\d+)章\\s*(.+?)在线收听\\s*<", text)
if not match:
print("chapter id and name not found")
return "", ""
chapter_id_src = int(match.group(1))
if chapter_id_src != chapter_id:
print("chapter id not matched")
return "", ""
chapter_name = match.group(2)
return url_audio, chapter_name
def download(self, chapter_url: str, chapter_id: int, chapter_name: str) -> str:
"""
链接有时效
Args:
chapter_url:
chapter_id:
chapter_name:
Returns:
下载完成文件路径
"""
if not chapter_url:
print("empty chapter url")
return ""
_, fmt = os.path.splitext(urllib.parse.urlparse(chapter_url).path)
name = self.file_name(chapter_id, chapter_name, fmt)
if not os.path.isdir(self.folder_out):
os.mkdir(self.folder_out)
path = os.path.join(self.folder_out, name)
res = self.session.get(chapter_url)
with open(path, "wb") as f:
f.write(res.content)
if os.path.getsize(path) < self.FILE_SIZE_VALID:
print("wrong file size: %.2fkb" % (os.path.getsize(path) / 1024))
return ""
return path
def downloaded(self) -> List[int]:
data = []
if not os.path.isdir(self.folder_out):
return []
for name in os.listdir(self.folder_out):
m = re.match("%s-%s-(\\d+)-.+" % (self.novel_name, self.voice_actor), name)
if m:
data.append(int(m.group(1)))
return data
def available(self) -> bool:
return time.time() - self.log_nlinka[-1] > self.NLINKA_INTERVAL
def wait(self) -> int:
return max(math.ceil(self.NLINKA_INTERVAL - (time.time() - self.log_nlinka[-1])), 0)
def config(self) -> Dict:
return {
"novel_id": self.novel_id,
"novel_name": self.novel_name,
"voice_actor": self.voice_actor,
"chapter_max": self.chapter_max,
"folder_out": self.folder_out,
"file_proxy": self.proxy.file_proxy
}
@staticmethod
def parse_novel_id(novel_url: str) -> str:
if not novel_url:
return ""
m = re.search("ting55\\.com/book/(\\d+)", novel_url)
if m:
return m.group(1)
return ""
def parse_config() -> Dict:
"""
argparse:
https://docs.python.org/zh-cn/3/howto/argparse.html
"""
parser = argparse.ArgumentParser(description="audiobook thief, just for https://ting55.com/")
parser.add_argument(
"url",
type=str,
help="novel url (e.g. https://ting55.com/book/9200)"
)
parser.add_argument(
"--name",
type=str, default="", required=False,
help="novel name"
)
parser.add_argument(
"--actor",
type=str, default="", required=False,
help="voice actor"
)
parser.add_argument(
"--start",
type=int, default=1, required=False,
help="starting chapter index (default 1)"
)
parser.add_argument(
"--end",
type=int, default=1, required=False,
help="last chapter index (default 1)"
)
parser.add_argument(
"--out",
type=str, default="", required=False,
help="output folder (default same as novel name)"
)
parser.add_argument(
"--proxy",
type=str, default="", required=False,
help="text file containing proxy IPs (e.g. 81.10.80.155:8080)"
)
args = parser.parse_args()
novel_id = Ting55Thief.parse_novel_id(args.url)
start = args.start
end = args.end
if end < start:
end = start
config = {
"id": novel_id,
"name": args.name,
"actor": args.actor,
"start": start,
"end": end,
"out": args.out,
"proxy": args.proxy
}
if not novel_id:
print("invalid novel url (e.g. https://ting55.com/book/9200)")
exit()
return config
def now() -> str:
return datetime.datetime.now().strftime("%H:%M:%S")
def run():
thief = Ting55Thief(CONFIG["id"], CONFIG["name"], CONFIG["actor"], CONFIG["out"], CONFIG["proxy"])
print("config:", thief.config())
data_downloaded = thief.downloaded()
data_todo = [i for i in range(CONFIG["start"], CONFIG["end"] + 1) if i not in data_downloaded]
print("todo: %d" % len(data_todo))
if not data_todo:
return
index = 0
while True:
if index >= len(data_todo):
break
chapter_id = data_todo[index]
print("[%s] %d/%d: %s" % (now(), chapter_id, CONFIG["end"], thief.chapter_url(chapter_id)))
chapter_url, chapter_name = thief.get(chapter_id)
print("[%s] %d/%d: %s %s" % (
now(), chapter_id, CONFIG["end"],
chapter_name if chapter_name else "null", chapter_url if chapter_url else "null"
))
if not chapter_url or not chapter_name:
break
file_saved = thief.download(chapter_url, chapter_id, chapter_name)
if not file_saved:
break
print("[%s] %d/%d: %s %.2fmb" % (
now(), chapter_id, CONFIG["end"], file_saved, os.path.getsize(file_saved) / 1024 / 1024
))
index += 1
print("sleep %d sec..." % thief.wait())
time.sleep(thief.wait())
if __name__ == "__main__":
VERSION = "1.5.230918"
print("audiobook thief v%s gitee.com/nguaduot/audiobook" % VERSION)
CONFIG = parse_config()
run()