吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 4045|回复: 6
收起左侧

[原创工具] vt检测工具

[复制链接]
神奇的人鱼 发表于 2025-3-18 16:48
截图如下
image.png
主要功能:
1. 直接拖放exe dll docx doc xlsx xls 文件可以直接上vt查询是否存在并且是否是恶意
2. 拖放包含文件hash的文本文件,自动读取所有hash(格式是一行一个文件hash),通过vt查询

需要注意:
需要把自己的vt api 填写到conf目录下的conf.json中

软件下载链接:
https://wwi.lanzoub.com/iKPza2qxuoab
解压密码:52pj

主要代码如下:

vt.py

import logging
import queue
import threading
import time
import filetype
import pyperclip
import tkinter as tk
from tkinter import ttk
from tkinter import Menu
from tkinterdnd2 import TkinterDnD, DND_FILES

from util.utils import *

# 创建一个锁对象
vt_lock = threading.Lock()

# 创建一个队列来管理待查询的文件
query_queue = queue.Queue()

class EmlFileApp(TkinterDnD.Tk):
    def __init__(self):
        super().__init__()

        self.title("文件hash vt检测")
        self.geometry("1560x600")

        # 创建表格显示文件信息
        self.tree = ttk.Treeview(self,
                                 columns=("upload_filehash", "vt_status", "md5", "sha1", "sha256",
                                          "detection_results"),
                                 show='headings')

        # 固定部分列宽
        self.tree.column("vt_status", width=120, stretch=False)
        self.tree.column("upload_filehash", width=320, stretch=False)
        self.tree.column("detection_results", width=120, stretch=False)

        self.tree.column("md5", width=240, stretch=False)
        self.tree.column("sha1", width=300, stretch=False)
        self.tree.column("sha256", width=460, stretch=False)  # SHA256列权重更高

        self.tree.heading("upload_filehash", text="上传文件哈希")
        self.tree.heading("vt_status", text="VT 查询结果")
        self.tree.heading("md5", text="MD5")
        self.tree.heading("sha1", text="SHA1")
        self.tree.heading("sha256", text="SHA256")
        self.tree.heading("detection_results", text="检测/总引擎数")
        self.tree.pack(fill=tk.BOTH, expand=True)

        # 创建上下文菜单
        self.context_menu = Menu(self.tree, tearoff=0)
        self.context_menu.add_command(label="复制整行", command=lambda: self.copy_tree_item())
        self.context_menu.add_command(label="复制选中项", command=lambda: self.copy_selected_column())

        # 支持文件拖放
        self.drop_target_register(DND_FILES)
        self.dnd_bind('<<Drop>>', self.on_drop)

        # 启动查询队列处理线程
        self.query_thread = threading.Thread(target=self.process_query_queue)
        self.query_thread.daemon = True
        self.query_thread.start()

        self.tree.bind("<Button-3>", self.on_right_click)

        self.current_item = None
        self.current_column = None

    def on_right_click(self, event):
        # 获取点击位置的项和列
        item = self.tree.identify_row(event.y)
        column = self.tree.identify_column(event.x)

        if item:
            # 存储当前选中项和列
            self.current_item = item
            self.current_column = column
            self.context_menu.post(event.x_root, event.y_root)

    def copy_tree_item(self):
        try:
            # 获取当前项数据并确保可迭代
            item = self.tree.item(self.current_item)
            item_data = item['values'] if item else []

            # 类型安全转换:处理所有元素类型(包括bool/None/list)
            sanitized_data = (
                str(x) if not isinstance(x, (list, dict))  # 基础类型转字符串
                else ' '.join(map(str, x)) if isinstance(x, list)  # 列表转空格分隔字符串
                else '[dict]'  # 字典特殊标记
                for x in item_data
            )

            # 空数据保护
            csv_line = "\t".join(sanitized_data) if item_data else ""
            pyperclip.copy(csv_line)

        except (AttributeError, KeyError, TypeError) as e:
            logging.warning(f"复制失败: {str(e)}")
            pyperclip.copy("")  # 清空剪贴板避免残留旧数据

    def copy_selected_column(self):
        try:
            col_index = int(self.current_column.replace('#', '')) - 1
            item_data = self.tree.item(self.current_item)['values'][col_index]
            pyperclip.copy(str(item_data))
        except (IndexError, AttributeError):
            pass

    # 拖放文件处理
    def on_drop(self, event):
        pattern = r'(?:{([^}]*)}|([^\s]+))'
        matches = re.findall(pattern, event.data)

        # 提取文件路径并获取文件名
        file_paths = []
        for match in matches:
            # Match 有两个捕获组,优先选择第一个非空捕获
            file_path = match[0] or match[1]
            file_paths.append(file_path)

        for file_path in file_paths:
            self.send_vt_check(file_path)

    def send_vt_check(self, file_path: str):

        try:
            # 获取文件真实后缀
            kind = filetype.guess(file_path)
            if kind is not None:
                if kind.extension in ["exe", "dll", "docx", "doc", "xlsx", "xls"]:
                    file_info_list = get_file_info(file_path)
                else:
                    file_info_list = get_file_list_with_txt(file_path)
            else:
                    file_info_list = get_file_list_with_txt(file_path)

            for file_info in file_info_list:
                file_upload_filehash = file_info["filehash"]
                file_vt_status = "等待查询"
                file_md5 = "等待查询"
                file_sha1 = "等待查询"
                file_sha256 = "等待查询"
                file_detection_results = "等待查询"

                # 在表格中显示文件信息
                item_id = self.tree.insert('', tk.END,
                                           values=(file_upload_filehash, file_vt_status, file_md5, file_sha1,
                                                   file_sha256, file_detection_results))
                # 启动线程查询文件信息
                send_thread = threading.Thread(target=self.check_vt,
                                               args=(file_upload_filehash, item_id))
                send_thread.start()

        except Exception:
            self.update_status("Error processing file")

    # 更新表格中的状态
    def update_tree_status(self, item_id,
                           file_vt_status=None,
                           file_md5=None,
                           file_sha1=None,
                           file_sha256=None,
                           file_detection_results=None
                           ):
        self.after(0, lambda: self.tree.item(item_id, values=(
            self.tree.item(item_id, 'values')[0],  # 文件hash,不变
            self.tree.item(item_id, 'values')[1] if file_vt_status is None else (
                file_vt_status if isinstance(file_vt_status, str) else
                "文件存在" if file_vt_status else "文件不存在"),  # vt查询状态
            self.tree.item(item_id, 'values')[2] if file_md5 is None else file_md5,  # MD5值
            self.tree.item(item_id, 'values')[3] if file_sha1 is None else file_sha1,  # SHA1值
            self.tree.item(item_id, 'values')[4] if file_sha256 is None else file_sha256,  # SHA256值
            self.tree.item(item_id, 'values')[
                5] if file_detection_results is None else file_detection_results if isinstance(file_vt_status,
                    str) else f"{file_detection_results["positives"]}/{file_detection_results["total"]}",
        )))

    def check_vt(self, file_upload_filehash, item_id):
        try:
            # 更新状态
            self.update_tree_status(item_id, "加入VT查询队列", "-", "-", "-",
                                    "-")
            # 将文件信息放入查询队列
            query_queue.put((file_upload_filehash, item_id))

        except Exception:
            self.update_tree_status(item_id, "查询失败")

    # 处理查询队列
    def process_query_queue(self):
        while True:
            if not query_queue.empty():
                file_upload_filehash, item_id = query_queue.get()
                result_queue = queue.Queue()

                self.update_tree_status(item_id, "开始查询")

                thread = threading.Thread(target=self.start_vt_query,
                                          args=(file_upload_filehash, item_id, result_queue))
                thread.start()
                thread.join()  # 等待查询线程完成
                vt_check_result = result_queue.get()  # 获取查询结果
                if vt_check_result is not None:
                    file_vt_status = vt_check_result["exists"]
                    file_md5 = vt_check_result["md5"]
                    file_sha1 = vt_check_result["sha1"]
                    file_sha256 = vt_check_result["sha256"]
                    file_detection_results = vt_check_result["detection_results"]

                    self.update_tree_status(item_id, file_vt_status, file_md5, file_sha1, file_sha256,
                                            file_detection_results)

                    query_queue.task_done()  # 查询完成,标记任务已完成
            else:
                # 队列为空时休眠一段时间
                time.sleep(1)

    def start_vt_query(self, file_upload_filehash, item_id, result_queue):
        # 定义一个线程安全的停止标志和查询计数器
        stop_event = threading.Event()
        query_count = 0

        def query():
            nonlocal query_count
            if stop_event.is_set():
                return  # 如果停止标志已设置,直接退出

            # 执行 VT 查询
            try:
                with vt_lock:  # 使用锁确保同一时间只有一个 vt 查询
                    vt_check_result = get_virustotal_report(file_upload_filehash)

                    time.sleep(2)
            except Exception:
                self.update_tree_status(item_id, "查询错误")

            # 检查查询结果是否为目标状态或查询次数是否超过3次
            if vt_check_result is not None or query_count >= 3:
                stop_event.set()  # 设置停止标志
                result_queue.put(vt_check_result)  # 将查询结果放入结果队列
            else:
                query_count += 1
                self.update_tree_status(item_id, f"开始第{query_count}次查询")
                # 如果未达到目标状态,继续定时查询
                threading.Timer(10, query).start()

        # 启动第一次查询
        query()

# 主程序入口
if __name__ == "__main__":
    app = EmlFileApp()
    app.mainloop()

conf.py

import json
import os
import sys

def get_current_directory():
    # sys.argv[0] 返回实际的 EXE 路径
    return os.path.dirname(os.path.abspath(sys.argv[0]))

def load_config():
    # 获取当前文件所在目录的父目录
    parent_dir = get_current_directory()

    print(parent_dir)

    # 配置文件位于父级目录的同级目录的 "config" 文件夹中
    config_path = os.path.join(parent_dir, "conf", "conf.json")

    # 检查文件是否存在
    if not os.path.exists(config_path):
        raise FileNotFoundError(f"Configuration file not found at {config_path}")

    # 加载 JSON 配置
    with open(config_path, "r") as file:
        return json.load(file)

# 加载配置
config = load_config()

# 示例: 访问配置值
API_KEY = config["API_KEY"]
BASE_URL = config["BASE_URL"]

utils.py

import hashlib

import requests
from .conf import *
from requests.exceptions import Timeout, RequestException

"""
{
    'detection_results': {'positives': 35, 'total': 73},
    'error': None,
    'exists': True,
    'md5': '5a13a7a2b420744d29aa6416486ea607',
    'sha1': '163b1fdc1560983732665361ddfb14947684d676',
    'sha256': '78db26380559e6cdecae74ed8faf39e968387bfb7c3f36029c3f2d1b2356739a'
 }
 """

import re

def is_hex(s):
    return re.match(r"^[0-9a-fA-F]+$", s) is not None

def check_hash_type(input_str):
    if len(input_str) == 32 and is_hex(input_str):
        return True, "md5"
    elif len(input_str) == 40 and is_hex(input_str):
        return True, "sha1"
    elif len(input_str) == 64 and is_hex(input_str):
        return True, "sha256"
    else:
        return False

def get_virustotal_report(file_hash: str) -> dict:
    """
    获取VirusTotal文件检测报告
    :param file_hash: 文件哈希值
    :return: 包含检测结果的JSON对象
    """

    # 初始化返回结构
    result = {
        "exists": False,
        "md5": None,
        "sha1": None,
        "sha256": None,
        "detection_results": {"positives": 0, "total": 0},
        "TrendMicro": False,
        "error": None
    }

    params = {
        "apikey": API_KEY,
        "resource": file_hash
    }

    try:
        # 添加超时参数(连接超时5秒,读取超时15秒)
        response = requests.get(BASE_URL, params=params, timeout=(5, 15))
        response.raise_for_status()  # 检查HTTP状态码(非200抛出异常)

        data = response.json()

        # 判断vt上是否存在此样本
        if data.get("response_code") == 1:
            result["exists"] = True
            result["md5"] = data.get("md5")
            result["sha1"] = data.get("sha1")
            result["sha256"] = data.get("sha256")

            result["detection_results"] = {
                "positives": data.get("positives", 0),
                "total":  data.get("total", 0)
            }

    except Timeout:
        result["error"] = "请求超时,请检查网络连接或稍后重试"
    except RequestException as e:
        result["error"] = f"网络请求异常: {str(e)}"
    except (KeyError, ValueError) as e:
        result["error"] = f"响应数据解析错误: {str(e)}"

    return result

def get_file_info(file: str) -> list:
    # 计算SHA1哈希值
    with open(file, 'rb') as f:
        sha1 = hashlib.sha1()
        while chunk := f.read(1024):
            sha1.update(chunk)
    filehash = sha1.hexdigest()
    return [{"filename":os.path.basename(file),
            "filehash": filehash}]

def get_file_list_with_txt(txt_file: str) -> list:

    with open(txt_file) as f:

        file_info_list = []

        for line in f:
            hashStr = line.strip()
            hash_status, hash_type = check_hash_type(hashStr)
            if hash_status:
                file_info_list.append({
                    "filename":hashStr,
                    "filehash": hashStr
                })

    return file_info_list

免费评分

参与人数 2吾爱币 +8 热心值 +2 收起 理由
heavenman + 1 + 1 感谢发布原创作品,吾爱破解论坛因你更精彩!
风之暇想 + 7 + 1 感谢发布原创作品,吾爱破解论坛因你更精彩!

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

QAZWSX666 发表于 2025-3-18 16:55
感谢大佬分享,下载学习了
walykyy 发表于 2025-3-18 17:04
ltgb 发表于 2025-3-18 17:48
 楼主| 神奇的人鱼 发表于 2025-3-18 17:54
ltgb 发表于 2025-3-18 17:48
检测vt有什么用?

把文件hash提交到vt上看看是不是恶意文件
binghe01 发表于 2025-3-18 19:25
感谢大佬分享,学到了学到了
性感小嘴 发表于 2025-3-23 12:00
这个我知道 模拟器经常用
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - 52pojie.cn ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2026-4-17 09:22

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表