本帖最后由 话痨司机啊 于 2022-5-9 15:48 编辑
下面的代码没测试,测试完有点问题,【去这里看】https://www.52pojie.cn/thread-1633562-1-1.html(测试完了,没啥问题,稍微改了点,有大佬可以指点下,我觉得写的还是有点那啥)[Python] 纯文本查看 复制代码
# //div[@class="item_list infinite_scroll masonry"]/div[img_num]//a/img/@alt 壁纸标题
# //div[@class="item_list infinite_scroll masonry"]/div[img_num]//a/img/@src 壁纸图片地址
# 总页数440页 https://www.mmonly.cc/gqbz/list_41_[page_num].html 页数page_num从1开始到440
# //div[@class="topmbx"]/a[last()]/text() 壁纸类型
# //div[@id="big-pic"]//a/@href 壁纸高清地址
import aiofiles
import aiohttp
import asyncio
import async_timeout
from collections import namedtuple
import re
import time
import os
from rich.console import Console
from fake_useragent import UserAgent
from lxml import etree
from typing import NamedTuple,List,Text
console = Console()
headers = UserAgent().random
Img_url_name = namedtuple('Img_url_name', ['img_url', 'img_name'])
Img_big_url_type = namedtuple('Img_big_url_name', ['img_big_url', 'img_type'])
async def get_html(url) -> Text:
"""
获取网页源码
"""
async with aiohttp.ClientSession() as session:
async with async_timeout.timeout(10):
async with session.get(url,headers=headers) as resp:
return await resp.text()
async def save_img(img_url, img_name) -> None:
"""
保存图片
"""
async with aiohttp.ClientSession() as session:
async with async_timeout.timeout(10):
async with session.get(img_url,headers=headers) as resp:
img = await resp.read()
async with aiofiles.open(img_name, 'wb') as f:
await f.write(img)
console.print(f'[yellow]{img_name} 下载完成!')
def get_img_url_name(resp_text) -> List:
"""
获取缩略图页面的图片地址和图片名称
"""
tree = etree.HTML(resp_text)
# 每页有24张缩略图和网址
img_url_name = [Img_url_name(img_url = tree.xpath(f'//div[@class="item_list infinite_scroll masonry"]/div{num}//a/img/@src'),
img_name = tree.xpath(f'//div[@class="item_list infinite_scroll masonry"]/div{num}//a/img/@alt')) for num in range(1,25)]
return img_url_name
def get_big_img(resp_text) -> Img_big_url_type:
"""
获取详情页的高清图片地址和图片类型
"""
tree = etree.HTML(resp_text)
img_big_url_type = Img_big_url_type(img_big_url=tree.xpath('//div[@id="big-pic"]//a/@href'),img_type=tree.xpath('//div[@class="topmbx"]/a[last()]/text()'))
return img_big_url_type
def mkdir(path) -> bool:
"""
创建文件夹
"""
isExists = os.path.exists(path)
if not isExists:
os.makedirs(path)
return True
async def main():
"""
主函数
"""
start_time = time.time()
with asyncio.Semaphore(5):
for num in range(1,441):
# 共计440页
url = f' https://www.mmonly.cc/gqbz/list_41_{num}.html'
resp_text = await get_html(url)
get_img_url_name_list = get_img_url_name(resp_text)
for img_url_name in get_img_url_name_list:
resp_text_big_img = await get_html(img_url_name.img_url)
# 图片标号开始为1
img_num = 1
# 图片页码开始为1
page_num = 1
while True:
try:
if page_num >= 2:
resp_text_big_img = await get_html(next_img_big_url)
img_big_url_type = get_big_img(resp_text_big_img)
if mkdir(os.path.join(img_big_url_type.img_type,img_url_name.img_name)):
await save_img(img_big_url_type.img_big_url, f'{img_url_name.img_name}_{img_num}.jpg')
img_num += 1
await asyncio.sleep(1)
page_num += 1
next_img_big_url = ".".join(img_url_name.img_url.split(".")[0:-1]) + "_" + page_num + ".html"
except:
img_num = 1
break
console.print(f'[green]下载完成! 耗时{time.time() - start_time}秒')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
|