vt.py
import logging
import queue
import threading
import time
import filetype
import pyperclip
import tkinter as tk
from tkinter import ttk
from tkinter import Menu
from tkinterdnd2 import TkinterDnD, DND_FILES
from util.utils import *
# 创建一个锁对象
vt_lock = threading.Lock()
# 创建一个队列来管理待查询的文件
query_queue = queue.Queue()
class EmlFileApp(TkinterDnD.Tk):
def __init__(self):
super().__init__()
self.title("文件hash vt检测")
self.geometry("1560x600")
# 创建表格显示文件信息
self.tree = ttk.Treeview(self,
columns=("upload_filehash", "vt_status", "md5", "sha1", "sha256",
"detection_results"),
show='headings')
# 固定部分列宽
self.tree.column("vt_status", width=120, stretch=False)
self.tree.column("upload_filehash", width=320, stretch=False)
self.tree.column("detection_results", width=120, stretch=False)
self.tree.column("md5", width=240, stretch=False)
self.tree.column("sha1", width=300, stretch=False)
self.tree.column("sha256", width=460, stretch=False) # SHA256列权重更高
self.tree.heading("upload_filehash", text="上传文件哈希")
self.tree.heading("vt_status", text="VT 查询结果")
self.tree.heading("md5", text="MD5")
self.tree.heading("sha1", text="SHA1")
self.tree.heading("sha256", text="SHA256")
self.tree.heading("detection_results", text="检测/总引擎数")
self.tree.pack(fill=tk.BOTH, expand=True)
# 创建上下文菜单
self.context_menu = Menu(self.tree, tearoff=0)
self.context_menu.add_command(label="复制整行", command=lambda: self.copy_tree_item())
self.context_menu.add_command(label="复制选中项", command=lambda: self.copy_selected_column())
# 支持文件拖放
self.drop_target_register(DND_FILES)
self.dnd_bind('<<Drop>>', self.on_drop)
# 启动查询队列处理线程
self.query_thread = threading.Thread(target=self.process_query_queue)
self.query_thread.daemon = True
self.query_thread.start()
self.tree.bind("<Button-3>", self.on_right_click)
self.current_item = None
self.current_column = None
def on_right_click(self, event):
# 获取点击位置的项和列
item = self.tree.identify_row(event.y)
column = self.tree.identify_column(event.x)
if item:
# 存储当前选中项和列
self.current_item = item
self.current_column = column
self.context_menu.post(event.x_root, event.y_root)
def copy_tree_item(self):
try:
# 获取当前项数据并确保可迭代
item = self.tree.item(self.current_item)
item_data = item['values'] if item else []
# 类型安全转换:处理所有元素类型(包括bool/None/list)
sanitized_data = (
str(x) if not isinstance(x, (list, dict)) # 基础类型转字符串
else ' '.join(map(str, x)) if isinstance(x, list) # 列表转空格分隔字符串
else '[dict]' # 字典特殊标记
for x in item_data
)
# 空数据保护
csv_line = "\t".join(sanitized_data) if item_data else ""
pyperclip.copy(csv_line)
except (AttributeError, KeyError, TypeError) as e:
logging.warning(f"复制失败: {str(e)}")
pyperclip.copy("") # 清空剪贴板避免残留旧数据
def copy_selected_column(self):
try:
col_index = int(self.current_column.replace('#', '')) - 1
item_data = self.tree.item(self.current_item)['values'][col_index]
pyperclip.copy(str(item_data))
except (IndexError, AttributeError):
pass
# 拖放文件处理
def on_drop(self, event):
pattern = r'(?:{([^}]*)}|([^\s]+))'
matches = re.findall(pattern, event.data)
# 提取文件路径并获取文件名
file_paths = []
for match in matches:
# Match 有两个捕获组,优先选择第一个非空捕获
file_path = match[0] or match[1]
file_paths.append(file_path)
for file_path in file_paths:
self.send_vt_check(file_path)
def send_vt_check(self, file_path: str):
try:
# 获取文件真实后缀
kind = filetype.guess(file_path)
if kind is not None:
if kind.extension in ["exe", "dll", "docx", "doc", "xlsx", "xls"]:
file_info_list = get_file_info(file_path)
else:
file_info_list = get_file_list_with_txt(file_path)
else:
file_info_list = get_file_list_with_txt(file_path)
for file_info in file_info_list:
file_upload_filehash = file_info["filehash"]
file_vt_status = "等待查询"
file_md5 = "等待查询"
file_sha1 = "等待查询"
file_sha256 = "等待查询"
file_detection_results = "等待查询"
# 在表格中显示文件信息
item_id = self.tree.insert('', tk.END,
values=(file_upload_filehash, file_vt_status, file_md5, file_sha1,
file_sha256, file_detection_results))
# 启动线程查询文件信息
send_thread = threading.Thread(target=self.check_vt,
args=(file_upload_filehash, item_id))
send_thread.start()
except Exception:
self.update_status("Error processing file")
# 更新表格中的状态
def update_tree_status(self, item_id,
file_vt_status=None,
file_md5=None,
file_sha1=None,
file_sha256=None,
file_detection_results=None
):
self.after(0, lambda: self.tree.item(item_id, values=(
self.tree.item(item_id, 'values')[0], # 文件hash,不变
self.tree.item(item_id, 'values')[1] if file_vt_status is None else (
file_vt_status if isinstance(file_vt_status, str) else
"文件存在" if file_vt_status else "文件不存在"), # vt查询状态
self.tree.item(item_id, 'values')[2] if file_md5 is None else file_md5, # MD5值
self.tree.item(item_id, 'values')[3] if file_sha1 is None else file_sha1, # SHA1值
self.tree.item(item_id, 'values')[4] if file_sha256 is None else file_sha256, # SHA256值
self.tree.item(item_id, 'values')[
5] if file_detection_results is None else file_detection_results if isinstance(file_vt_status,
str) else f"{file_detection_results["positives"]}/{file_detection_results["total"]}",
)))
def check_vt(self, file_upload_filehash, item_id):
try:
# 更新状态
self.update_tree_status(item_id, "加入VT查询队列", "-", "-", "-",
"-")
# 将文件信息放入查询队列
query_queue.put((file_upload_filehash, item_id))
except Exception:
self.update_tree_status(item_id, "查询失败")
# 处理查询队列
def process_query_queue(self):
while True:
if not query_queue.empty():
file_upload_filehash, item_id = query_queue.get()
result_queue = queue.Queue()
self.update_tree_status(item_id, "开始查询")
thread = threading.Thread(target=self.start_vt_query,
args=(file_upload_filehash, item_id, result_queue))
thread.start()
thread.join() # 等待查询线程完成
vt_check_result = result_queue.get() # 获取查询结果
if vt_check_result is not None:
file_vt_status = vt_check_result["exists"]
file_md5 = vt_check_result["md5"]
file_sha1 = vt_check_result["sha1"]
file_sha256 = vt_check_result["sha256"]
file_detection_results = vt_check_result["detection_results"]
self.update_tree_status(item_id, file_vt_status, file_md5, file_sha1, file_sha256,
file_detection_results)
query_queue.task_done() # 查询完成,标记任务已完成
else:
# 队列为空时休眠一段时间
time.sleep(1)
def start_vt_query(self, file_upload_filehash, item_id, result_queue):
# 定义一个线程安全的停止标志和查询计数器
stop_event = threading.Event()
query_count = 0
def query():
nonlocal query_count
if stop_event.is_set():
return # 如果停止标志已设置,直接退出
# 执行 VT 查询
try:
with vt_lock: # 使用锁确保同一时间只有一个 vt 查询
vt_check_result = get_virustotal_report(file_upload_filehash)
time.sleep(2)
except Exception:
self.update_tree_status(item_id, "查询错误")
# 检查查询结果是否为目标状态或查询次数是否超过3次
if vt_check_result is not None or query_count >= 3:
stop_event.set() # 设置停止标志
result_queue.put(vt_check_result) # 将查询结果放入结果队列
else:
query_count += 1
self.update_tree_status(item_id, f"开始第{query_count}次查询")
# 如果未达到目标状态,继续定时查询
threading.Timer(10, query).start()
# 启动第一次查询
query()
# 主程序入口
if __name__ == "__main__":
app = EmlFileApp()
app.mainloop()
conf.py
import json
import os
import sys
def get_current_directory():
# sys.argv[0] 返回实际的 EXE 路径
return os.path.dirname(os.path.abspath(sys.argv[0]))
def load_config():
# 获取当前文件所在目录的父目录
parent_dir = get_current_directory()
print(parent_dir)
# 配置文件位于父级目录的同级目录的 "config" 文件夹中
config_path = os.path.join(parent_dir, "conf", "conf.json")
# 检查文件是否存在
if not os.path.exists(config_path):
raise FileNotFoundError(f"Configuration file not found at {config_path}")
# 加载 JSON 配置
with open(config_path, "r") as file:
return json.load(file)
# 加载配置
config = load_config()
# 示例: 访问配置值
API_KEY = config["API_KEY"]
BASE_URL = config["BASE_URL"]
utils.py
import hashlib
import requests
from .conf import *
from requests.exceptions import Timeout, RequestException
"""
{
'detection_results': {'positives': 35, 'total': 73},
'error': None,
'exists': True,
'md5': '5a13a7a2b420744d29aa6416486ea607',
'sha1': '163b1fdc1560983732665361ddfb14947684d676',
'sha256': '78db26380559e6cdecae74ed8faf39e968387bfb7c3f36029c3f2d1b2356739a'
}
"""
import re
def is_hex(s):
return re.match(r"^[0-9a-fA-F]+$", s) is not None
def check_hash_type(input_str):
if len(input_str) == 32 and is_hex(input_str):
return True, "md5"
elif len(input_str) == 40 and is_hex(input_str):
return True, "sha1"
elif len(input_str) == 64 and is_hex(input_str):
return True, "sha256"
else:
return False
def get_virustotal_report(file_hash: str) -> dict:
"""
获取VirusTotal文件检测报告
:param file_hash: 文件哈希值
:return: 包含检测结果的JSON对象
"""
# 初始化返回结构
result = {
"exists": False,
"md5": None,
"sha1": None,
"sha256": None,
"detection_results": {"positives": 0, "total": 0},
"TrendMicro": False,
"error": None
}
params = {
"apikey": API_KEY,
"resource": file_hash
}
try:
# 添加超时参数(连接超时5秒,读取超时15秒)
response = requests.get(BASE_URL, params=params, timeout=(5, 15))
response.raise_for_status() # 检查HTTP状态码(非200抛出异常)
data = response.json()
# 判断vt上是否存在此样本
if data.get("response_code") == 1:
result["exists"] = True
result["md5"] = data.get("md5")
result["sha1"] = data.get("sha1")
result["sha256"] = data.get("sha256")
result["detection_results"] = {
"positives": data.get("positives", 0),
"total": data.get("total", 0)
}
except Timeout:
result["error"] = "请求超时,请检查网络连接或稍后重试"
except RequestException as e:
result["error"] = f"网络请求异常: {str(e)}"
except (KeyError, ValueError) as e:
result["error"] = f"响应数据解析错误: {str(e)}"
return result
def get_file_info(file: str) -> list:
# 计算SHA1哈希值
with open(file, 'rb') as f:
sha1 = hashlib.sha1()
while chunk := f.read(1024):
sha1.update(chunk)
filehash = sha1.hexdigest()
return [{"filename":os.path.basename(file),
"filehash": filehash}]
def get_file_list_with_txt(txt_file: str) -> list:
with open(txt_file) as f:
file_info_list = []
for line in f:
hashStr = line.strip()
hash_status, hash_type = check_hash_type(hashStr)
if hash_status:
file_info_list.append({
"filename":hashStr,
"filehash": hashStr
})
return file_info_list