吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 811|回复: 18
收起左侧

[求助] python代码问题

[复制链接]
shiyun01 发表于 2024-5-28 22:04
哪位大佬帮我看看这代码啥问题,用al写的准备做个检查复制的链接中可用链接有效性的,现在卡在这了,不知道啥问题。
[Python] 纯文本查看 复制代码
import re
import asyncio
import aiohttp
from aiohttp.client_exceptions import ClientError
import tkinter as tk
from tkinter import scrolledtext, simpledialog, messagebox, Toplevel, ttk
import webbrowser
from bs4 import BeautifulSoup
from selenium import webdriver
from msedge.selenium_tools import Edge, EdgeOptions
from webdriver_manager.microsoft import EdgeChromiumDriverManager
import requests_cache
import os

# Default invalid keywords
default_invalid_keywords = ["文件不存在", "已被删除", "Not Found", "404", "拒绝连接", "无法访问", "取消分享", "文件打不开"]
keyword_vars = {}

# Threshold for considering a page as "blank" or "unresponsive"
BLANK_PAGE_THRESHOLD = 200

# Global variable to control cancellation
cancel_check = False

# Cache for link results
cache_db_path = os.path.join(os.path.expanduser("~"), "link_cache")
try:
    requests_cache.install_cache(cache_db_path, backend='sqlite', expire_after=3600)
except (requests_cache.backends.sqlite.SQLiteError, requests_cache.backends.base.BaseCacheError) as e:
    print(f"Failed to initialize cache: {e}")
    cache_db_path = None

# Configure Edge
options = EdgeOptions()
options.use_chromium = True
options.add_argument('headless')
options.add_argument('disable-gpu')

# Create a global WebDriver instance
driver = None

def initialize_driver():
    global driver
    if driver is None:
        driver = Edge(EdgeChromiumDriverManager().install(), options=options)

def close_driver():
    global driver
    if driver is not None:
        driver.quit()
        driver = None

def extract_links(text):
    """Extracts all links from the given text."""
    link_pattern = re.compile(r'(https?://\S+)')
    links = link_pattern.findall(text)
    return links

async def check_link_validity(link, mode):
    """Checks if a link is valid according to the selected mode."""
    session = requests_cache.CachedSession(cache_db_path)
    
    try:
        if mode == "快速筛选":
            result = await quick_check(session, link)
        elif mode == "正常筛选":
            result = await normal_check(session, link)
        else:  # 仔细筛选
            result = await detailed_check(session, link)
    except (ClientError, asyncio.TimeoutError):
        result = False

    return result

async def quick_check(session, link):
    """Quickly check if a link is reachable using a HEAD request."""
    async with session.head(link, allow_redirects=True, timeout=5) as response:
        return response.status < 400

async def normal_check(session, link):
    """Check link validity using a GET request and keyword/content analysis."""
    async with session.get(link, allow_redirects=True, timeout=10) as response:
        if response.status >= 400:
            return False

        page_content = await response.text()
        active_keywords = [kw for kw, var in keyword_vars.items() if var.get()]

        if any(keyword in page_content for keyword in active_keywords):
            return False
        if len(page_content) < BLANK_PAGE_THRESHOLD:
            return False

        return True

async def detailed_check(session, link):
    """Check link validity with advanced techniques including JavaScript rendering."""
    async with session.get(link, allow_redirects=True, timeout=30) as response:
        if response.status >= 400:
            return False

        page_content = await response.text()
        active_keywords = [kw for kw, var in keyword_vars.items() if var.get()]

        if any(keyword in page_content for keyword in active_keywords):
            return False
        if len(page_content) < BLANK_PAGE_THRESHOLD:
            return False

        # Use Selenium for JavaScript rendering
        initialize_driver()
        try:
            driver.get(link)
            await asyncio.sleep(5)  # Give some time for the page to fully load
            page_content = driver.page_source
            driver.delete_all_cookies()
        except Exception as e:
            print(f"Selenium error: {e}")
            return False

        if not check_html_structure(page_content):
            return False

        return True

def check_html_structure(content):
    """Checks if the HTML structure is valid by looking for specific tags."""
    soup = BeautifulSoup(content, 'html.parser')
    return soup.title is not None or soup.h1 is not None

def open_link(event):
    """Opens the link in a web browser."""
    widget = event.widget
    index = widget.index(tk.CURRENT)
    line_index = int(index.split('.')[0])
    link = widget.get(f"{line_index}.0", f"{line_index}.end")
    if link.startswith('http'):
        webbrowser.open(link)

async def check_links():
    global cancel_check
    cancel_check = False
    check_button.config(state=tk.DISABLED)
    cancel_button.config(state=tk.NORMAL)

    result_area.config(state=tk.NORMAL)
    result_area.delete("1.0", tk.END)
    result_area.insert(tk.INSERT, "正在检查链接,请稍候...\n")
    result_area.config(state=tk.DISABLED)

    text = text_area.get("1.0", tk.END)
    links = extract_links(text)

    valid_links = []
    invalid_links = []

    mode = mode_var.get()
    total_links = len(links)

    async def check_link(link, idx):
        is_valid = await check_link_validity(link, mode)
        if is_valid:
            valid_links.append(link)
        else:
            invalid_links.append(link)
        progress_var.set(int((idx + 1) / total_links * 100))

    small_threads = small_threads_var.get()

    # Limit the number of concurrent tasks
    semaphore = asyncio.Semaphore(10 if small_threads else 100)

    async def sem_check_link(link, idx):
        async with semaphore:
            await check_link(link, idx)

    tasks = [sem_check_link(link, idx) for idx, link in enumerate(links)]
    await asyncio.gather(*tasks)

    if not cancel_check:
        show_results(valid_links, invalid_links)
    else:
        show_cancel_message()

    check_button.config(state=tk.NORMAL)
    cancel_button.config(state=tk.DISABLED)

def show_results(valid_links, invalid_links):
    result_area.config(state=tk.NORMAL)
    result_area.delete("1.0", tk.END)

    result_area.insert(tk.INSERT, "有效链接:\n")
    for link in valid_links:
        result_area.insert(tk.INSERT, f"{link}\n", "link")

    result_area.insert(tk.INSERT, "\n无效链接:\n")
    for link in invalid_links:
        result_area.insert(tk.INSERT, f"{link}\n")

    result_area.tag_configure("link", foreground="blue", underline=True)
    result_area.tag_bind("link", "<Button-1>", open_link)
    result_area.config(state=tk.DISABLED)

def clear_text():
    text_area.delete("1.0", tk.END)
    result_area.config(state=tk.NORMAL)
    result_area.delete("1.0", tk.END)
    result_area.config(state=tk.DISABLED)
    progress_var.set(0)

def cancel_checking():
    global cancel_check
    cancel_check = True

def show_cancel_message():
    result_area.config(state=tk.NORMAL)
    result_area.delete("1.0", tk.END)
    result_area.insert(tk.INSERT, "操作已取消。\n")
    result_area.config(state=tk.DISABLED)

def open_settings():
    settings_window = Toplevel(root)
    settings_window.title("设置筛选词")

    tk.Label(settings_window, text="筛选词列表:").pack()

    keywords_frame = tk.Frame(settings_window)
    keywords_frame.pack(padx=10, pady=10)

    def update_keywords_display():
        for widget in keywords_frame.winfo_children():
            widget.destroy()
        row, col = 0, 0
        for keyword, var in keyword_vars.items():
            cb = tk.Checkbutton(keywords_frame, text=keyword, variable=var)
            cb.grid(row=row, column=col, padx=5, pady=5, sticky='w')
            cb.bind("<Button-3>", lambda e, kw=keyword: confirm_remove_keyword(kw))
            col += 1
            if col >= 8:
                col = 0
                row += 1

    def add_keyword():
        new_keyword = simpledialog.askstring("添加筛选词", "请输入新的筛选词:", parent=settings_window)
        if new_keyword and new_keyword not in keyword_vars:
            var = tk.BooleanVar(value=True)
            keyword_vars[new_keyword] = var
            update_keywords_display()

    def confirm_remove_keyword(keyword):
        if messagebox.askokcancel("确认删除", f"你确定要删除筛选词 '{keyword}' 吗?"):
            remove_keyword(keyword)

    def remove_keyword(keyword):
        if keyword in keyword_vars:
            del keyword_vars[keyword]
            update_keywords_display()

    add_keyword_button = tk.Button(settings_window, text="添加筛选词", command=add_keyword)
    add_keyword_button.pack(pady=5)

    update_keywords_display()

    save_button = tk.Button(settings_window, text="关闭", command=settings_window.destroy)
    save_button.pack(pady=5)

def initialize_keywords():
    global keyword_vars
    for keyword in default_invalid_keywords:
        var = tk.BooleanVar(value=True)
        keyword_vars[keyword] = var

root = tk.Tk()
root.title("链接有效性检查工具")

main_frame = tk.Frame(root)
main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)

left_frame = tk.Frame(main_frame)
left_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)

right_frame = tk.Frame(main_frame)
right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True)

tk.Label(left_frame, text="请输入包含链接的文本:").pack()

text_area = scrolledtext.ScrolledText(left_frame, wrap=tk.WORD, width=50, height=20)
text_area.pack(padx=10, pady=10, fill=tk.BOTH, expand=True)

# Add mode selection
mode_var = tk.StringVar(value="正常筛选")
tk.Label(left_frame, text="选择筛选模式:").pack()
modes = ["快速筛选", "正常筛选", "仔细筛选"]
for mode in modes:
    rb = tk.Radiobutton(left_frame, text=mode, variable=mode_var, value=mode)
    rb.pack(anchor='w')

# Add small threads option
small_threads_var = tk.BooleanVar(value=False)
small_threads_checkbutton = tk.Checkbutton(left_frame, text="启用小线程模式", variable=small_threads_var)
small_threads_checkbutton.pack(anchor='w')

check_button = tk.Button(left_frame, text="检查链接", command=lambda: asyncio.run(check_links()))
check_button.pack(pady=5)

cancel_button = tk.Button(left_frame, text="取消", command=cancel_checking, state=tk.DISABLED)
cancel_button.pack(pady=5)

clear_button = tk.Button(left_frame, text="清除文本", command=clear_text)
clear_button.pack(pady=5)

settings_button = tk.Button(left_frame, text="设置", command=open_settings)
settings_button.pack(pady=5)

tk.Label(right_frame, text="结果:").pack()

result_area = scrolledtext.ScrolledText(right_frame, wrap=tk.WORD, width=50, height=20)
result_area.pack(padx=10, pady=10, fill=tk.BOTH, expand=True)
result_area.config(state=tk.DISABLED)
result_area.bind("<Button-1>", open_link)

# Add progress bar
progress_var = tk.IntVar()
progress_bar = ttk.Progressbar(left_frame, orient="horizontal", length=400, mode="determinate", variable=progress_var)
progress_bar.pack(pady=5)

# Initialize keyword variables
initialize_keywords()

# Ensure WebDriver is closed on exit
def on_closing():
    close_driver()
    root.destroy()

root.protocol("WM_DELETE_WINDOW", on_closing)

# Run the main loop
root.mainloop()

麻烦各位大佬了
{207DD119-43A8-4c28-A9EC-030F27EE56EF}.png

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

我心飞翔1995 发表于 2024-5-29 01:55
所以到底有什么问题,你是不会写了还是程序执行有问题?
adx123456 发表于 2024-5-29 07:13
哥们   你写函数不带注释,我们短时间内是不知道你要干什么的。不写注释,你自己能懂
hualy 发表于 2024-5-29 09:21
adx123456 发表于 2024-5-29 07:13
哥们   你写函数不带注释,我们短时间内是不知道你要干什么的。不写注释,你自己能懂

人家有注释的,只不过是英文
ttgjyie 发表于 2024-5-29 09:25
人家都说了是AI给写的,所以英文注释就很正常了
Hellsing 发表于 2024-5-29 09:31
本帖最后由 Hellsing 于 2024-5-29 09:33 编辑

[Python] 纯文本查看 复制代码
import re
import asyncio
import aiohttp
from aiohttp.client_exceptions import ClientError
import tkinter as tk
from tkinter import scrolledtext, simpledialog, messagebox, Toplevel, ttk
import webbrowser
from bs4 import BeautifulSoup
from selenium import webdriver
from msedge.selenium_tools import Edge, EdgeOptions
from webdriver_manager.microsoft import EdgeChromiumDriverManager
import os

# Default invalid keywords
default_invalid_keywords = ["文件不存在", "已被删除", "Not Found", "404", "拒绝连接", "无法访问", "取消分享", "文件打不开"]
keyword_vars = {}

# Threshold for considering a page as "blank" or "unresponsive"
BLANK_PAGE_THRESHOLD = 200

# Global variable to control cancellation
cancel_check = False

# Configure Edge
options = EdgeOptions()
options.use_chromium = True
options.add_argument('headless')
options.add_argument('disable-gpu')

# Create a global WebDriver instance
driver = None

def initialize_driver():
    global driver
    if driver is None:
        driver = Edge(EdgeChromiumDriverManager().install(), options=options)

def close_driver():
    global driver
    if driver is not None:
        driver.quit()
        driver = None

def extract_links(text):
    """Extracts all links from the given text."""
    link_pattern = re.compile(r'(https?://\S+)')
    links = link_pattern.findall(text)
    return links

async def check_link_validity(link, mode):
    """Checks if a link is valid according to the selected mode."""
    async with aiohttp.ClientSession() as session:
        try:
            if mode == "快速筛选":
                result = await quick_check(session, link)
            elif mode == "正常筛选":
                result = await normal_check(session, link)
            else:  # 仔细筛选
                result = await detailed_check(session, link)
        except (ClientError, asyncio.TimeoutError):
            result = False

    return result

async def quick_check(session, link):
    """Quickly check if a link is reachable using a HEAD request."""
    async with session.head(link, allow_redirects=True, timeout=5) as response:
        return response.status < 400

async def normal_check(session, link):
    """Check link validity using a GET request and keyword/content analysis."""
    async with session.get(link, allow_redirects=True, timeout=10) as response:
        if response.status >= 400:
            return False

        page_content = await response.text()
        active_keywords = [kw for kw, var in keyword_vars.items() if var.get()]

        if any(keyword in page_content for keyword in active_keywords):
            return False
        if len(page_content) < BLANK_PAGE_THRESHOLD:
            return False

        return True

async def detailed_check(session, link):
    """Check link validity with advanced techniques including JavaScript rendering."""
    async with session.get(link, allow_redirects=True, timeout=30) as response:
        if response.status >= 400:
            return False

        page_content = await response.text()
        active_keywords = [kw for kw, var in keyword_vars.items() if var.get()]

        if any(keyword in page_content for keyword in active_keywords):
            return False
        if len(page_content) < BLANK_PAGE_THRESHOLD:
            return False

        # Use Selenium for JavaScript rendering
        initialize_driver()
        try:
            driver.get(link)
            await asyncio.sleep(5)  # Give some time for the page to fully load
            page_content = driver.page_source
            driver.delete_all_cookies()
        except Exception as e:
            print(f"Selenium error: {e}")
            return False

        if not check_html_structure(page_content):
            return False

        return True

def check_html_structure(content):
    """Checks if the HTML structure is valid by looking for specific tags."""
    soup = BeautifulSoup(content, 'html.parser')
    return soup.title is not None or soup.h1 is not None

def open_link(event):
    """Opens the link in a web browser."""
    widget = event.widget
    index = widget.index(tk.CURRENT)
    line_index = int(index.split('.')[0])
    link = widget.get(f"{line_index}.0", f"{line_index}.end")
    if link.startswith('http'):
        webbrowser.open(link)

async def check_links():
    global cancel_check
    cancel_check = False
    check_button.config(state=tk.DISABLED)
    cancel_button.config(state=tk.NORMAL)

    result_area.config(state=tk.NORMAL)
    result_area.delete("1.0", tk.END)
    result_area.insert(tk.INSERT, "正在检查链接,请稍候...\n")
    result_area.config(state=tk.DISABLED)

    text = text_area.get("1.0", tk.END)
    links = extract_links(text)

    valid_links = []
    invalid_links = []

    mode = mode_var.get()
    total_links = len(links)

    async def check_link(link, idx):
        is_valid = await check_link_validity(link, mode)
        if is_valid:
            valid_links.append(link)
        else:
            invalid_links.append(link)
        progress_var.set(int((idx + 1) / total_links * 100))

    small_threads = small_threads_var.get()

    # Limit the number of concurrent tasks
    semaphore = asyncio.Semaphore(10 if small_threads else 100)

    async def sem_check_link(link, idx):
        async with semaphore:
            await check_link(link, idx)

    tasks = [sem_check_link(link, idx) for idx, link in enumerate(links)]
    await asyncio.gather(*tasks)

    if not cancel_check:
        show_results(valid_links, invalid_links)
    else:
        show_cancel_message()

    check_button.config(state=tk.NORMAL)
    cancel_button.config(state=tk.DISABLED)

def show_results(valid_links, invalid_links):
    result_area.config(state=tk.NORMAL)
    result_area.delete("1.0", tk.END)

    result_area.insert(tk.INSERT, "有效链接:\n")
    for link in valid_links:
        result_area.insert(tk.INSERT, f"{link}\n", "link")

    result_area.insert(tk.INSERT, "\n无效链接:\n")
    for link in invalid_links:
        result_area.insert(tk.INSERT, f"{link}\n")

    result_area.tag_configure("link", foreground="blue", underline=True)
    result_area.tag_bind("link", "<Button-1>", open_link)
    result_area.config(state=tk.DISABLED)

def clear_text():
    text_area.delete("1.0", tk.END)
    result_area.config(state=tk.NORMAL)
    result_area.delete("1.0", tk.END)
    result_area.config(state=tk.DISABLED)
    progress_var.set(0)

def cancel_checking():
    global cancel_check
    cancel_check = True

def show_cancel_message():
    result_area.config(state=tk.NORMAL)
    result_area.delete("1.0", tk.END)
    result_area.insert(tk.INSERT, "操作已取消。\n")
    result_area.config(state=tk.DISABLED)

def open_settings():
    settings_window = Toplevel(root)
    settings_window.title("设置筛选词")

    tk.Label(settings_window, text="筛选词列表:").pack()

    keywords_frame = tk.Frame(settings_window)
    keywords_frame.pack(padx=10, pady=10)

    def update_keywords_display():
        for widget in keywords_frame.winfo_children():
            widget.destroy()
        row, col = 0, 0
        for keyword, var in keyword_vars.items():
            cb = tk.Checkbutton(keywords_frame, text=keyword, variable=var)
            cb.grid(row=row, column=col, padx=5, pady=5, sticky='w')
            cb.bind("<Button-3>", lambda e, kw=keyword: confirm_remove_keyword(kw))
            col += 1
            if col >= 8:
                col = 0
                row += 1

    def add_keyword():
        new_keyword = simpledialog.askstring("添加筛选词", "请输入新的筛选词:", parent=settings_window)
        if new_keyword and new_keyword not in keyword_vars:
            var = tk.BooleanVar(value=True)
            keyword_vars[new_keyword] = var
            update_keywords_display()

    def confirm_remove_keyword(keyword):
        if messagebox.askokcancel("确认删除", f"你确定要删除筛选词 '{keyword}' 吗?"):
            remove_keyword(keyword)

    def remove_keyword(keyword):
        if keyword in keyword_vars:
            del keyword_vars[keyword]
            update_keywords_display()

    add_keyword_button = tk.Button(settings_window, text="添加筛选词", command=add_keyword)
    add_keyword_button.pack(pady=5)

    update_keywords_display()

    save_button = tk.Button(settings_window, text="关闭", command=settings_window.destroy)
    save_button.pack(pady=5)

def initialize_keywords():
    global keyword_vars
    for keyword in default_invalid_keywords:
        var = tk.BooleanVar(value=True)
        keyword_vars[keyword] = var

root = tk.Tk()
root.title("链接有效性检查工具")

main_frame = tk.Frame(root)
main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)

left_frame = tk.Frame(main_frame)
left_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)

right_frame = tk.Frame(main_frame)
right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True)

tk.Label(left_frame, text="请输入包含链接的文本:").pack()

text_area = scrolledtext.ScrolledText(left_frame, wrap=tk.WORD, width=50, height=20)
text_area.pack(padx=10, pady=10, fill=tk.BOTH, expand=True)

# Add mode selection
mode_var = tk.StringVar(value="正常筛选")
tk.Label(left_frame, text="选择筛选模式:").pack()
modes = ["快速筛选", "正常筛选", "仔细筛选"]
for mode in modes:
    rb = tk.Radiobutton(left_frame, text=mode, variable=mode_var, value=mode)
    rb.pack(anchor='w')

# Add small threads option
small_threads_var = tk.BooleanVar(value=False)
small_threads_checkbutton = tk.Checkbutton(left_frame, text="启用小线程模式", variable=small_threads_var)
small_threads_checkbutton.pack(anchor='w')

check_button = tk.Button(left_frame, text="检查链接", command=lambda: asyncio.create_task(check_links()))
check_button.pack(pady=5)

cancel_button = tk.Button(left_frame, text="取消", command=cancel_checking, state=tk.DISABLED)
cancel_button.pack(pady=5)

clear_button = tk.Button(left_frame, text="清除文本", command=clear_text)
clear_button.pack(pady=5)

settings_button = tk.Button(left_frame, text="设置", command=open_settings)
settings_button.pack(pady=5)

tk.Label(right_frame, text="结果:").pack()

result_area = scrolledtext.ScrolledText(right_frame, wrap=tk.WORD, width=50, height=20)
result_area.pack(padx=10, pady=10, fill=tk.BOTH, expand=True)
result_area.config(state=tk.DISABLED)
result_area.bind("<Button-1>", open_link)

# Add progress bar
progress_var = tk.IntVar()
progress_bar = ttk.Progressbar(left_frame, orient="horizontal", length=400, mode="determinate", variable=progress_var)
progress_bar.pack(pady=5)

# Initialize keyword variables
initialize_keywords()

# Ensure WebDriver is closed on exit
def on_closing():
    close_driver()
    root.destroy()

root.protocol("WM_DELETE_WINDOW", on_closing)

# Run the main loop
root.mainloop()

将requests_cache替换为aiohttp
使用asyncio.create_task而不是asyncio.run来避免阻塞tkinter事件循环
添加了初始化和关闭SeleniumWebDriver的正确处理
马了顶大 发表于 2024-5-29 09:32
ai写的就去问ai
知心 发表于 2024-5-29 09:38
你详细描述一下需要什么功能
 楼主| shiyun01 发表于 2024-5-29 11:02
就是想筛选链接那些可以打开,那些已经失效了,现在的问题就是打开后他要么是太多检测错了,要么是一直开在检查界面那
adx123456 发表于 2024-5-29 11:07
hualy 发表于 2024-5-29 09:21
人家有注释的,只不过是英文

我说的是  函数 注释,他那是在组件哪里注释的,有什么用。学过的人都知道是什么意思
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2024-12-12 08:32

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表