import
time
import
requests
import
os
import
random
from
bs4
import
BeautifulSoup
from
concurrent.futures
import
ThreadPoolExecutor, as_completed
BOOK_URL
=
"https://www.paozww.com/biquge/414243/"
HEADERS
=
{
"User-Agent"
:
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36"
,
"Referer"
: BOOK_URL,
"Accept-Language"
:
"zh-CN,zh;q=0.9"
,
"Accept-Encoding"
:
"gzip, deflate, br"
,
"Connection"
:
"keep-alive"
,
"Cache-Control"
:
"no-cache"
,
"Pragma"
:
"no-cache"
}
DOWNLOAD_PATH
=
os.path.join(os.path.dirname(os.path.abspath(__file__)),
"一夕千悟"
)
def
get_book_info():
session
=
requests.Session()
try
:
for
_
in
range
(
3
):
response
=
session.get(BOOK_URL, headers
=
HEADERS, timeout
=
20
)
if
response.status_code
=
=
200
:
break
time.sleep(
2
)
else
:
raise
Exception(f
"无法获取页面,状态码:{response.status_code}"
)
response.encoding
=
'gb18030'
soup
=
BeautifulSoup(response.text,
'html.parser'
)
return
{
"title"
: soup.find(
'meta'
, {
'property'
:
'og:title'
})[
'content'
].split(
'_'
)[
0
],
"author"
: soup.find(
'meta'
, {
'property'
:
'og:novel:author'
})[
'content'
],
"session"
: session,
"chapter_list"
: soup.select(
"#list > dl > dd > a"
)[
12
:] # 跳过前
12
个伪章节
}
except
Exception as e:
print
(f
"书籍信息获取失败: {str(e)}"
)
return
None
def
enhanced_downloader(args):
chapter, session, index
=
args
title
=
chapter.text.strip()
url
=
chapter[
'href'
]
if
chapter[
'href'
].startswith(
'http'
)
else
f
"https://www.paozww.com{chapter['href']}"
time.sleep(random.expovariate(
1
/
1.5
))
try
:
dynamic_headers
=
HEADERS.copy()
dynamic_headers.update({
"Referer"
: BOOK_URL,
"X-Requested-With"
:
"XMLHttpRequest"
if
random.random() >
0.7
else
""
})
response
=
session.get(url, headers
=
dynamic_headers, timeout
=
25
)
if
response.status_code !
=
200
:
return
(index, title, f
"【HTTP错误:{response.status_code}】"
)
response.encoding
=
'gb18030'
soup
=
BeautifulSoup(response.text,
'html.parser'
)
content_div
=
soup.find(
'div'
,
id
=
'content'
)
if
not
content_div:
return
(index, title,
"【内容结构异常】"
)
cleaned
=
[]
for
elem
in
content_div.contents:
if
elem.name
=
=
'div'
and
'alert'
in
elem.get(
'class'
, []):
continue
text
=
elem.get_text(strip
=
True
)
if
text
and
len
(text) >
10
:
cleaned.append(f
" {text}\n"
)
return
(index, title, ''.join(cleaned))
except
Exception as e:
return
(index, title, f
"【系统错误:{str(e)}】"
)
def
main():
print
(
"正在初始化..."
)
book_info
=
get_book_info()
if
not
book_info:
return
os.makedirs(DOWNLOAD_PATH, exist_ok
=
True
)
filename
=
f
"{book_info['title']} - {book_info['author']}.txt"
filepath
=
os.path.join(DOWNLOAD_PATH, filename)
print
(f
"\n开始下载:{book_info['title']}"
)
print
(f
"章节总数:{len(book_info['chapter_list'])}"
)
with
open
(filepath,
'w'
, encoding
=
'utf-8'
) as f:
f.write(f
"《{book_info['title']}》\n作者:{book_info['author']}\n\n"
)
with ThreadPoolExecutor(max_workers
=
2
) as executor:
futures
=
[]
for
idx, ch
in
enumerate
(book_info[
'chapter_list'
]):
if
idx
%
10
=
=
0
:
time.sleep(
5
)
futures.append(executor.submit(enhanced_downloader, (ch, book_info[
'session'
], idx)))
success
=
0
for
i, future
in
enumerate
(as_completed(futures),
1
):
idx, title, content
=
future.result()
f.write(f
"\n\n第{idx+1}章 {title}\n{content}"
)
progress
=
i
/
len
(book_info[
'chapter_list'
])
*
100
status
=
"✓"
if
"【"
not
in
content
else
"✗"
print
(f
"\r[{status}] 进度: {progress:.1f}% | 成功: {success}"
, end
=
'')
success
+
=
0
if
"【"
in
content
else
1
print
(f
"\n\n下载完成!保存路径:{filepath}"
)
if
__name__
=
=
"__main__"
:
main()