吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 1972|回复: 27
收起左侧

[讨论] 在通义千问的帮助下,做的一个爬虫脚本,大模型真是越来越厉害

[复制链接]
iprogramer 发表于 2024-6-26 22:31
本帖最后由 iprogramer 于 2024-6-26 22:39 编辑

[Python] 纯文本查看 复制代码
import requests
from bs4 import BeautifulSoup
import os
import re
import base64
import logging

html_template = """
<!DOCTYPE html>
<html lang="en">
<title>{title}</title>
<head>
    <meta charset="UTF-8">
    <style>{style}
    </style>    
</head>
<body class='container'>
{content}
</body>

</html>
"""

style = """
    body {
        font-family: Arial, sans-serif;
    }
    .container {
        width: 700px;
        margin: 0 auto; /* 居中 */
        margin-top:30px;
    }
    .container img { /* 添加这一段来让图片水平居中 */
        display: block; /* 将图片视为块级元素以便应用margin */
        margin: 0 auto; /* 实现水平居中 */
        max-width: 60%; /* 确保图片不超过容器宽度 */
        height: auto; /* 保持图片原始宽高比 */
    }    
    .container h1 {
        font-size:36px;
        margin: 0 auto; /* 居中 */
    }
"""

def setup_logger():
    #设置日志配置
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class WebScraper:
    def __init__(self, start_id=0):
        self.start_id = start_id
        self.last_id_path = "last_id.txt"

    def load_last_id(self):
        """加载上次处理的ID"""
        if os.path.exists(self.last_id_path):
            with open(self.last_id_path, 'r') as file:
                return int(file.read().strip())
        return self.start_id

    @staticmethod
    def save_last_id(last_id):
        #保存已处理的ID
        with open("last_id.txt", "w") as file:
            file.write(str(last_id))

    @staticmethod
    def make_request(url):
       #发送网络请求,增加异常处理
        try:
            response = requests.get(url)
            response.raise_for_status()  # 抛出HTTP错误
            return response
        except requests.RequestException as e:
            print(f"请求错误: {e}")
            return None

    @staticmethod
    def sanitize_filename(file_name):
        #清理文件名中的非法字符
        return re.sub(r'[^\w\s-]', '', file_name).strip().replace(' ', '_')

    def parse_webpage(self, response):
        #解析网页内容,提取标题、正文、时间等
        soup = BeautifulSoup(response.text, 'html.parser')
        title = soup.find('h1').text.strip()
        article_body = soup.find(class_='article-body')
        pub_date_tag = soup.find(class_='time')
        pub_date = pub_date_tag.text.strip().replace("-", "") if pub_date_tag else ""
        return title, article_body, pub_date

    def download_and_encode_image(self, img_url):
        #下载图片并转换为Base64编码
        response = self.make_request(img_url)
        if response:
            return base64.b64encode(response.content).decode('utf-8')
        return None


    def embed_images_in_html(self, soup, images_base64):
        #将图片Base64编码嵌入到HTML中
        for img in soup.find_all('img'):
            img_src = img.get('src')
            if img_src and img_src.startswith(('http:', 'https:')):
                base64_img = images_base64.get(img_src)
                if base64_img:
                    img['src'] = base64_img

    def scrape_and_process(self, url):
        #整合所有步骤处理单个网页
        response = self.make_request(url)
        if not response:
            return

        title, article_body, pub_date = self.parse_webpage(response)
        title_sanitized = self.sanitize_filename(title)

        images_base64 = {img.get('src'): self.download_and_encode_image(img.get('src'))
                         for img in article_body.find_all('img')
                         if img.get('src').startswith(('http:', 'https:'))}

        self.embed_images_in_html(article_body, images_base64)

        html_body = html_template.format(title=title_sanitized, content=str(article_body), style=style)

        author = '安全内参'
        file_name = f"[{author}] - {pub_date} - {title_sanitized}.html"
        year, month, day = pub_date[:4], pub_date[4:6], pub_date[6:8]
        folder_to_save="D:\\微信文件"
        target_dir = f"{folder_to_save}\{author}\{year}\{month}\{day}"
        os.makedirs(target_dir, exist_ok=True)  # 使用exist_ok避免检查目录是否存在
        f_name = os.path.join(target_dir, file_name)

        try:
            with open(f_name, 'w', encoding='utf-8') as file:
                file.write(html_body)
            logging.info(f'文章已成功保存至:{f_name}')
        except IOError as e:
            logging.error(f'保存文件时发生错误:{e}')

    def run(self):
        #主循环,处理一系列网页
        last_id = self.load_last_id()
        for i in range(last_id, 90000):
            url = f'https://www.secrss.com/articles/{i}'
            print(f"正在处理:{url}")
            self.scrape_and_process(url)
            self.save_last_id(i + 1)


if __name__ == '__main__':
    setup_logger()
    scraper = WebScraper()
    scraper.run()

免费评分

参与人数 1吾爱币 +1 热心值 +1 收起 理由
wari01 + 1 + 1 谢谢@Thanks!

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

 楼主| iprogramer 发表于 2024-6-26 22:31
刚学了python半个月,希望高手能修改成多线程的,爬的太慢了
andyop 发表于 2024-6-26 23:19
iprogramer 发表于 2024-6-26 22:31
刚学了python半个月,希望高手能修改成多线程的,爬的太慢了

grequests
 楼主| iprogramer 发表于 2024-6-26 23:24
[Python] 纯文本查看 复制代码
import requests
from bs4 import BeautifulSoup
import base64
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
import re
import logging
import os

html_template = """
<!DOCTYPE html>
<html lang="en">
<title>{title}</title>
<head>
    <meta charset="UTF-8">
    <style>{style}
    </style>    
</head>
<body class='container'>
{content}
</body>

</html>
"""

style = """
    body {
        font-family: Arial, sans-serif;
    }
    .container {
        width: 700px;
        margin: 0 auto; /* 居中 */
        margin-top:30px;
    }
    .container img { /* 添加这一段来让图片水平居中 */
        display: block; /* 将图片视为块级元素以便应用margin */
        margin: 0 auto; /* 实现水平居中 */
        max-width: 60%; /* 确保图片不超过容器宽度 */
        height: auto; /* 保持图片原始宽高比 */
    }    
    .container h1 {
        font-size:36px;
        margin: 0 auto; /* 居中 */
    }
"""

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def download_image_to_base64(url: str) -> str:
    """下载图片并转换为Base64编码"""
    try:
        response = requests.get(url)
        response.raise_for_status()
        encoded_image = base64.b64encode(response.content).decode('utf-8')
        return f"data:image/jpeg;base64,{encoded_image}"
    except requests.RequestException as e:
        logging.error(f"网络请求错误: {e}")
        return None

def save_progress(i: int) -> None:
    """保存进度到文件"""
    Path("last_id.txt").write_text(str(i))

def load_progress() -> int:
    """从文件加载进度"""
    if Path("last_id.txt").exists():
        return int(Path("last_id.txt").read_text())
    return 0

def sanitize_filename(filename: str) -> str:
    """清理文件名中的非法字符"""
    return re.sub(r'[^\w\s\.-]', '', filename).strip().replace('_', ' ').replace('.', '_')

def scrape_and_embed_images(webpage_url: str, last_id: int) -> None:
    """爬取网页内容,并将图片转换为Base64编码嵌入HTML"""
    try:
        response = requests.get(webpage_url)
        response.raise_for_status()

        soup = BeautifulSoup(response.text, 'html.parser')
        images = {img.get('src'): download_image_to_base64(img.get('src')) for img in soup.find_all('img', src=True)}

        for img_src, base64_img in images.items():
            if base64_img:
                for img in soup.find_all('img', src=img_src):
                    img['src'] = base64_img

        title = soup.find('h1')

        article_body = soup.find(class_='article-body')
        pub_date = soup.find(class_='time').text.strip().replace("-", "")
        title = title.text.strip()
        title = sanitize_filename(title)
        html_body = html_template.format(title=title, content=article_body, style=style)
        print(f'访问成功,正在处理:{pub_date} - {title}')
        # 假设图片链接存储在img标签的src属性中
        # 保存修改后的HTML到本地文件
        author = '安全内参'
        file_name = f"[{author}] - {pub_date} - {title}.html"
        year, month, day = pub_date[:4], pub_date[4:6], pub_date[6:8]
        folder_to_save = "D:\\微信文件"
        target_dir = f"{folder_to_save}\{author}\{year}\{month}\{day}"
        os.makedirs(target_dir, exist_ok=True)  # 使用exist_ok避免检查目录是否存在
        f_name = os.path.join(target_dir, file_name)
        with open(f_name, 'w', encoding='utf-8') as file:
            file.write(html_body)
    except requests.HTTPError as e:
        logging.error(f"请求网页失败,状态码:{response.status_code}")
    except Exception as e:
        logging.error(f"发生错误:{e}")

def main() -> None:
    """主函数,控制多线程抓取和进度保存逻辑"""
    last_id = load_progress()
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = {executor.submit(scrape_and_embed_images, f'https://www.secrss.com/articles/{id}', last_id): id for id in range(last_id, 90000)}
        for future in futures:
            future.result()  # 等待任务完成
            if future.exception():  # 检查是否有异常
                logging.error(f"处理ID {futures[future]} 时发生异常")
            save_progress(futures[future])

if __name__ == '__main__':
    main()
 楼主| iprogramer 发表于 2024-6-26 23:26
把上面的代码复制到kimi里面,让他帮我实现的多线程,居然能直接跑通,大模型太厉害了
long9788523 发表于 2024-6-27 08:07
确实,大模型比你更懂你的代码哦
Wapj_Wolf 发表于 2024-6-27 08:16
以后编程只需要懂点基础,剩下的全交给AI
liuhaigang12 发表于 2024-6-27 08:21
只能针对直接的,个性化那种需求 很难
willgoon 发表于 2024-6-27 08:36
简单的需求还是可以的
ciker_li 发表于 2024-6-27 08:46
你是怎么提问的?
能演示过程吗?
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2024-12-14 01:01

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表