吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 2042|回复: 5
收起左侧

[Packers] 新手试试初发个分析VMProtect保护生成脚本的半成品,大家多多关照,不喜勿喷!!!

[复制链接]
Jackdeng 发表于 2025-11-29 09:56
先发下运行后的输出:
D:\anaconda3\envs\vnpy-py311\python.exe E:\Project\VMProtect-devirtualization\main.py
2025-11-29 09:35:27,752 - VMProtectDevirtualizer - INFO - 基础日志系统初始化完成
2025-11-29 09:35:27,760 - VMProtectDevirtualizer - INFO - 已加载配置文件: E:\Project\VMProtect-devirtualization\config\config.yaml
2025-11-29 09:35:27,763 - VMProtectDevirtualizer - INFO - [main.py:258] - 日志系统重新配置完成,级别: INFO
2025-11-29 09:35:27,763 - VMProtectDevirtualizer - INFO - 日志系统重新配置完成,级别: INFO
2025-11-29 09:35:27,764 - VMProtectDevirtualizer - INFO - [main.py:287] - 初始化VMProtect反虚拟化分析组件...
2025-11-29 09:35:27,764 - VMProtectDevirtualizer - INFO - 初始化VMProtect反虚拟化分析组件...
2025-11-29 09:35:27,765 - modules.core.trace_parser - INFO - Capstone反汇编器初始化成功
2025-11-29 09:35:27,792 - modules.output.x32dbg_exporter - INFO - x32dbg导出器初始化完成
2025-11-29 09:35:28,244 - modules.utils.visualization - INFO - 优化整合版可视化工具初始化完成,输出目录: output\graphs
2025-11-29 09:35:28,244 - VMProtectDevirtualizer - INFO - [main.py:349] - 组件初始化完成,总耗时: 0.48秒
2025-11-29 09:35:28,244 - VMProtectDevirtualizer - INFO - 组件初始化完成,总耗时: 0.48秒
2025-11-29 09:35:28,244 - modules.integration.lyscript_integration - INFO - LyScript集成初始化完成
2025-11-29 09:35:28,244 - VMProtectDevirtualizer - INFO - [main.py:81] - LyScript集成状态: {'lyscript32_available': True, 'lyscript64_available': False, 'dynamic_analysis': True, 'memory_analysis': True, 'runtime_inspection': True}
2025-11-29 09:35:28,244 - VMProtectDevirtualizer - INFO - LyScript集成状态: {'lyscript32_available': True, 'lyscript64_available': False, 'dynamic_analysis': True, 'memory_analysis': True, 'runtime_inspection': True}
2025-11-29 09:35:28,244 - VMProtectDevirtualizer - INFO - [main.py:367] - 分析会话ID: 20251129_093528
2025-11-29 09:35:28,244 - VMProtectDevirtualizer - INFO - 分析会话ID: 20251129_093528
2025-11-29 09:35:28,244 - VMProtectDevirtualizer - INFO - [main.py:372] - 开始分析跟踪文件: vmp_traces\sample1.vmp.trace (模式: deep)
2025-11-29 09:35:28,244 - VMProtectDevirtualizer - INFO - 开始分析跟踪文件: vmp_traces\sample1.vmp.trace (模式: deep)
2025-11-29 09:35:28,244 - VMProtectDevirtualizer - INFO - [main.py:381] - 步骤1: 解析跟踪文件...
2025-11-29 09:35:28,244 - VMProtectDevirtualizer - INFO - 步骤1: 解析跟踪文件...
2025-11-29 09:35:28,245 - modules.core.trace_parser - INFO - 开始解析跟踪文件: vmp_traces\sample1.vmp.trace
2025-11-29 09:35:28,775 - modules.core.trace_parser - INFO - 成功解析 10815 条指令
2025-11-29 09:35:28,775 - VMProtectDevirtualizer - INFO - [main.py:397] - 解析到 10815 条指令
2025-11-29 09:35:28,775 - VMProtectDevirtualizer - INFO - 解析到 10815 条指令
2025-11-29 09:35:28,775 - VMProtectDevirtualizer - INFO - [main.py:400] - 步骤2: 检测VMProtect虚拟化特征...
2025-11-29 09:35:28,775 - VMProtectDevirtualizer - INFO - 步骤2: 检测VMProtect虚拟化特征...
2025-11-29 09:35:28,776 - modules.core.vm_detector - INFO - 开始VM特征检测,共 10815 条指令
2025-11-29 09:35:28,923 - modules.core.vm_detector - INFO - VM特征检测完成,置信度: 1.00
2025-11-29 09:35:28,923 - VMProtectDevirtualizer - INFO - [main.py:405] - 步骤3: 构建控制流图...
2025-11-29 09:35:28,923 - VMProtectDevirtualizer - INFO - 步骤3: 构建控制流图...
2025-11-29 09:35:28,924 - modules.core.cfg_builder - INFO - 构建VM控制流图...
2025-11-29 09:35:29,149 - modules.core.cfg_builder - INFO - CFG构建完成: 2244 个基本块, 2243 条边
2025-11-29 09:35:29,150 - VMProtectDevirtualizer - INFO - [main.py:410] - 步骤4: 执行复杂度分析...
2025-11-29 09:35:29,150 - VMProtectDevirtualizer - INFO - 步骤4: 执行复杂度分析...
2025-11-29 09:35:29,150 - modules.analysis.complexity_analyzer - INFO - 开始复杂度分析...
2025-11-29 09:35:29,384 - modules.analysis.complexity_analyzer - INFO - 复杂度分析完成: 保护级别=low, 分数=18.60
2025-11-29 09:35:29,384 - VMProtectDevirtualizer - INFO - [main.py:415] - 步骤5: 执行统计分析...
2025-11-29 09:35:29,384 - VMProtectDevirtualizer - INFO - 步骤5: 执行统计分析...
2025-11-29 09:35:29,384 - modules.analysis.statistical_analyzer - INFO - 开始统计分析,共 10815 条指令
2025-11-29 09:35:29,497 - VMProtectDevirtualizer - INFO - [main.py:420] - 步骤6: 执行模式分析...
2025-11-29 09:35:29,497 - VMProtectDevirtualizer - INFO - 步骤6: 执行模式分析...
2025-11-29 09:35:29,497 - modules.analysis.pattern_analyzer - INFO - 开始模式分析...
2025-11-29 09:35:29,521 - modules.analysis.pattern_analyzer - INFO - 分析VMP 3.x+特征模式...
2025-11-29 09:35:29,541 - modules.analysis.pattern_analyzer - INFO - 模式分析完成,检测到 8 种模式
2025-11-29 09:35:29,541 - VMProtectDevirtualizer - INFO - [main.py:425] - 步骤7: 执行行为分析...
2025-11-29 09:35:29,541 - VMProtectDevirtualizer - INFO - 步骤7: 执行行为分析...
2025-11-29 09:35:29,542 - modules.analysis.behavior_analyzer - INFO - 分析VM运行时行为...
2025-11-29 09:35:29,603 - modules.analysis.behavior_analyzer - INFO - 行为分析完成
2025-11-29 09:35:29,603 - VMProtectDevirtualizer - INFO - [main.py:430] - 步骤8: 估算VMProtect版本...
2025-11-29 09:35:29,603 - VMProtectDevirtualizer - INFO - 步骤8: 估算VMProtect版本...
2025-11-29 09:35:29,604 - modules.analysis.version_estimator - INFO - 估算VMProtect保护版本...
2025-11-29 09:35:29,604 - modules.analysis.version_estimator - INFO - 版本估算完成: vmp1.x (置信度: 0.12)
2025-11-29 09:35:29,604 - VMProtectDevirtualizer - INFO - [main.py:436] - 【DEEP模式】启用增强分析流程...
2025-11-29 09:35:29,604 - VMProtectDevirtualizer - INFO - 【DEEP模式】启用增强分析流程...
2025-11-29 09:35:29,604 - VMProtectDevirtualizer - INFO - [main.py:439] - → 执行符号执行分析...
2025-11-29 09:35:29,604 - VMProtectDevirtualizer - INFO - → 执行符号执行分析...
2025-11-29 09:35:29,604 - modules.execution.symbolic_engine - INFO - 使用符号执行分析指令和控制流图...
2025-11-29 09:35:29,604 - modules.execution.symbolic_engine - INFO - 开始符号执行分析...
2025-11-29 09:35:29,694 - VMProtectDevirtualizer - INFO - [main.py:444] - → 重建原始操作...
2025-11-29 09:35:29,694 - VMProtectDevirtualizer - INFO - → 重建原始操作...
2025-11-29 09:35:29,694 - modules.execution.op_reconstructor - INFO - 重建原始x86操作...
2025-11-29 09:35:29,694 - modules.execution.op_reconstructor - INFO - 从跟踪数据重建原始x86操作...
2025-11-29 09:35:29,753 - modules.execution.op_reconstructor - INFO - 操作重建完成: 0 条重建指令, 处理器覆盖率: 0.00%
2025-11-29 09:35:29,755 - VMProtectDevirtualizer - INFO - [main.py:767] - 已导出分析样本到: output\debug_samples\sample_analysis_20251129_093528.json
2025-11-29 09:35:29,755 - VMProtectDevirtualizer - INFO - 已导出分析样本到: output\debug_samples\sample_analysis_20251129_093528.json
2025-11-29 09:35:29,755 - VMProtectDevirtualizer - INFO - [main.py:466] - 分析完成!总耗时: 1.51秒
2025-11-29 09:35:29,755 - VMProtectDevirtualizer - INFO - 分析完成!总耗时: 1.51秒
2025-11-29 09:35:29,756 - VMProtectDevirtualizer - INFO - [main.py:775] - 生成HTML格式报告...
2025-11-29 09:35:29,756 - VMProtectDevirtualizer - INFO - 生成HTML格式报告...
2025-11-29 09:35:29,756 - modules.output.report_generator - INFO - 生成分析报告...
2025-11-29 09:35:29,756 - modules.output.report_generator - INFO - 生成分析报告,格式: ['json', 'txt', 'html']
2025-11-29 09:35:30,630 - modules.output.report_generator - INFO - 报告已生成: ['html\\vmp_analysis_20251129_093529.json', 'html\\vmp_analysis_20251129_093529.txt', 'html\\vmp_analysis_20251129_093529.html']
2025-11-29 09:35:30,630 - VMProtectDevirtualizer - INFO - [main.py:788] - 报告生成成功: {'status': 'success', 'report_path': 'html\\vmp_analysis_20251129_093529.json', 'output_directory': 'html', 'formats_generated': ['json', 'txt', 'html'], 'message': '报告生成完成'}
2025-11-29 09:35:30,630 - VMProtectDevirtualizer - INFO - 报告生成成功: {'status': 'success', 'report_path': 'html\\vmp_analysis_20251129_093529.json', 'output_directory': 'html', 'formats_generated': ['json', 'txt', 'html'], 'message': '报告生成完成'}
2025-11-29 09:35:30,631 - VMProtectDevirtualizer - INFO - [main.py:846] - 执行摘要已生成: output\reports\executive_summary_20251129_093528.txt
2025-11-29 09:35:30,631 - VMProtectDevirtualizer - INFO - 执行摘要已生成: output\reports\executive_summary_20251129_093528.txt
2025-11-29 09:35:30,631 - VMProtectDevirtualizer - INFO - [main.py:881] - 生成可视化结果...
2025-11-29 09:35:30,631 - VMProtectDevirtualizer - INFO - 生成可视化结果...
2025-11-29 09:35:30,631 - modules.utils.visualization - INFO - 开始生成增强版可视化报告...
✓ 分析报告已生成: {'status': 'success', 'report_path': 'html\\vmp_analysis_20251129_093529.json', 'output_directory': 'html', 'formats_generated': ['json', 'txt', 'html'], 'message': '报告生成完成'}
2025-11-29 09:35:31,103 - modules.utils.visualization - INFO - 复杂度图表已保存: output\graphs\complexity_radar.png
2025-11-29 09:35:31,341 - modules.utils.visualization - INFO - VM特征图表已保存: output\graphs\vm_features.png
2025-11-29 09:35:31,554 - modules.utils.visualization - INFO - 指令分布图表已保存: output\graphs\instruction_distribution.png
2025-11-29 09:35:31,889 - modules.utils.visualization - INFO - 保护级别图表已保存: output\graphs\protection_level.png
2025-11-29 09:36:32,327 - modules.utils.visualization - INFO - 控制流图已保存: output\graphs\control_flow_graph.png
2025-11-29 09:36:32,694 - modules.utils.visualization - INFO - 虚拟机架构图已保存: output\graphs\vm_architecture.png
2025-11-29 09:36:33,536 - modules.utils.visualization - INFO - 指令分析图已保存: output\graphs\instruction_analysis.png
2025-11-29 09:36:34,404 - modules.utils.visualization - INFO - 交互式仪表板已保存: output\graphs\interactive_dashboard.html
2025-11-29 09:36:34,404 - modules.utils.visualization - INFO - 创建分析仪表盘...
2025-11-29 09:36:34,406 - modules.utils.visualization - INFO - 分析仪表盘已创建: output\graphs\analysis_dashboard.html
2025-11-29 09:36:34,406 - modules.utils.visualization - INFO - 增强版可视化报告生成完成
2025-11-29 09:36:34,406 - VMProtectDevirtualizer - INFO - [main.py:892] - 生成交互式仪表板...
2025-11-29 09:36:34,406 - VMProtectDevirtualizer - INFO - 生成交互式仪表板...
2025-11-29 09:36:34,448 - modules.utils.visualization - INFO - 交互式仪表板已保存: output\graphs\interactive_dashboard.html
2025-11-29 09:36:34,448 - VMProtectDevirtualizer - INFO - [main.py:899] - 生成分析仪表盘...
2025-11-29 09:36:34,448 - VMProtectDevirtualizer - INFO - 生成分析仪表盘...
2025-11-29 09:36:34,448 - modules.utils.visualization - INFO - 创建分析仪表盘...
2025-11-29 09:36:34,449 - modules.utils.visualization - INFO - 分析仪表盘已创建: output\graphs\analysis_dashboard.html
2025-11-29 09:36:34,449 - VMProtectDevirtualizer - INFO - [main.py:902] - 分析仪表盘已生成: output\graphs\analysis_dashboard.html
2025-11-29 09:36:34,449 - VMProtectDevirtualizer - INFO - 分析仪表盘已生成: output\graphs\analysis_dashboard.html
2025-11-29 09:36:34,449 - VMProtectDevirtualizer - INFO - [main.py:905] - 生成专业分析图表...
2025-11-29 09:36:34,449 - VMProtectDevirtualizer - INFO - 生成专业分析图表...
2025-11-29 09:37:32,168 - modules.utils.visualization - INFO - 控制流图已保存: output\graphs\detailed_control_flow.png
2025-11-29 09:37:32,972 - modules.utils.visualization - INFO - 指令分析图已保存: output\graphs\detailed_instruction_analysis.png
2025-11-29 09:37:33,340 - modules.utils.visualization - INFO - 虚拟机架构图已保存: output\graphs\vm_architecture_detailed.png
2025-11-29 09:37:33,340 - VMProtectDevirtualizer - INFO - [main.py:937] - 生成带图表的HTML报告...
2025-11-29 09:37:33,340 - VMProtectDevirtualizer - INFO - 生成带图表的HTML报告...
2025-11-29 09:37:33,340 - modules.utils.visualization - ERROR - 生成HTML报告失败: 'VMVisualization' object has no attribute '_generate_plot_data'
2025-11-29 09:37:33,340 - VMProtectDevirtualizer - INFO - [main.py:945] - 增强可视化报告生成完成
2025-11-29 09:37:33,340 - VMProtectDevirtualizer - INFO - 增强可视化报告生成完成
2025-11-29 09:37:33,340 - VMProtectDevirtualizer - INFO - [main.py:950] - 生成的可视化文件:
2025-11-29 09:37:33,340 - VMProtectDevirtualizer - INFO - 生成的可视化文件:
2025-11-29 09:37:33,341 - VMProtectDevirtualizer - INFO - [main.py:952] -   - analysis_dashboard.html
2025-11-29 09:37:33,341 - VMProtectDevirtualizer - INFO -   - analysis_dashboard.html
2025-11-29 09:37:33,341 - VMProtectDevirtualizer - INFO - [main.py:952] -   - complexity_radar.png
2025-11-29 09:37:33,341 - VMProtectDevirtualizer - INFO -   - complexity_radar.png
2025-11-29 09:37:33,341 - VMProtectDevirtualizer - INFO - [main.py:952] -   - control_flow_graph.png
2025-11-29 09:37:33,341 - VMProtectDevirtualizer - INFO -   - control_flow_graph.png
2025-11-29 09:37:33,341 - VMProtectDevirtualizer - INFO - [main.py:952] -   - detailed_control_flow.png
2025-11-29 09:37:33,341 - VMProtectDevirtualizer - INFO -   - detailed_control_flow.png
2025-11-29 09:37:33,341 - VMProtectDevirtualizer - INFO - [main.py:952] -   - detailed_instruction_analysis.png
2025-11-29 09:37:33,341 - VMProtectDevirtualizer - INFO -   - detailed_instruction_analysis.png
2025-11-29 09:37:33,341 - VMProtectDevirtualizer - INFO - [main.py:952] -   - instruction_analysis.png
2025-11-29 09:37:33,341 - VMProtectDevirtualizer - INFO -   - instruction_analysis.png
2025-11-29 09:37:33,341 - VMProtectDevirtualizer - INFO - [main.py:952] -   - instruction_distribution.png
2025-11-29 09:37:33,341 - VMProtectDevirtualizer - INFO -   - instruction_distribution.png
2025-11-29 09:37:33,342 - VMProtectDevirtualizer - INFO - [main.py:952] -   - interactive_dashboard.html
2025-11-29 09:37:33,342 - VMProtectDevirtualizer - INFO -   - interactive_dashboard.html
2025-11-29 09:37:33,342 - VMProtectDevirtualizer - INFO - [main.py:952] -   - protection_level.png
2025-11-29 09:37:33,342 - VMProtectDevirtualizer - INFO -   - protection_level.png
2025-11-29 09:37:33,342 - VMProtectDevirtualizer - INFO - [main.py:952] -   - vm_architecture.png
2025-11-29 09:37:33,342 - VMProtectDevirtualizer - INFO -   - vm_architecture.png
2025-11-29 09:37:33,342 - VMProtectDevirtualizer - INFO - [main.py:952] -   - vm_architecture_detailed.png
2025-11-29 09:37:33,342 - VMProtectDevirtualizer - INFO -   - vm_architecture_detailed.png
2025-11-29 09:37:33,342 - VMProtectDevirtualizer - INFO - [main.py:952] -   - vm_features.png
2025-11-29 09:37:33,342 - VMProtectDevirtualizer - INFO -   - vm_features.png
2025-11-29 09:37:33,343 - VMProtectDevirtualizer - INFO - [main.py:514] - 导出x32dbg脚本到: output/scripts/x32dbg_analysis.py
2025-11-29 09:37:33,343 - VMProtectDevirtualizer - INFO - 导出x32dbg脚本到: output/scripts/x32dbg_analysis.py
2025-11-29 09:37:33,343 - modules.output.x32dbg_exporter - INFO - 导出x32dbg分析脚本到: output\scripts\x32dbg_analysis.py
2025-11-29 09:37:33,343 - modules.output.x32dbg_exporter - INFO - x32dbg脚本导出成功: output\scripts\x32dbg_analysis.py
2025-11-29 09:37:33,343 - VMProtectDevirtualizer - INFO - [main.py:522] - x32dbg脚本导出成功: output/scripts/x32dbg_analysis.py
2025-11-29 09:37:33,343 - VMProtectDevirtualizer - INFO - x32dbg脚本导出成功: output/scripts/x32dbg_analysis.py
2025-11-29 09:37:33,343 - modules.output.x32dbg_exporter - INFO - 导出x64dbg分析脚本到: output\scripts\x32dbg_analysis_x64dbg.txt
2025-11-29 09:37:33,344 - modules.output.x32dbg_exporter - INFO - x64dbg脚本导出成功: output\scripts\x32dbg_analysis_x64dbg.txt
2025-11-29 09:37:33,344 - VMProtectDevirtualizer - INFO - [main.py:859] - 导出IDA脚本到: output/scripts/ida_analysis.py
2025-11-29 09:37:33,344 - VMProtectDevirtualizer - INFO - 导出IDA脚本到: output/scripts/ida_analysis.py
2025-11-29 09:37:33,344 - modules.output.ida_exporter - INFO - 导出IDA分析脚本...
2025-11-29 09:37:33,344 - modules.output.ida_exporter - INFO - 生成IDA Pro分析脚本...
2025-11-29 09:37:33,345 - modules.output.ida_exporter - INFO - IDA脚本已导出到: output\scripts\ida_analysis.py (大小: 10321 字节)
2025-11-29 09:37:33,345 - modules.output.ida_exporter - INFO - IDA脚本导出成功: output\scripts\ida_analysis.py
2025-11-29 09:37:33,345 - VMProtectDevirtualizer - INFO - [main.py:867] - IDA脚本导出成功: output/scripts/ida_analysis.py
2025-11-29 09:37:33,345 - VMProtectDevirtualizer - INFO - IDA脚本导出成功: output/scripts/ida_analysis.py
2025-11-29 09:37:33,345 - VMProtectDevirtualizer - INFO - [main.py:988] - 执行清理操作...
2025-11-29 09:37:33,345 - VMProtectDevirtualizer - INFO - 执行清理操作...
2025-11-29 09:37:33,345 - VMProtectDevirtualizer - INFO - [main.py:999] - 临时文件清理完成
2025-11-29 09:37:33,345 - VMProtectDevirtualizer - INFO - 临时文件清理完成
✓ 可视化结果生成完成
✓ x32dbg脚本已导出: output/scripts/x32dbg_analysis.py
✓ IDA脚本已导出: output/scripts/ida_analysis.py

关键分析结果:
  指令数量: 10815
  VM检测置信度: 0.00/10.0
  代码复杂度: 18.60/10.0
  估计版本: vmp1.x

进程已结束,退出代码0
主程序代码:
[Asm] 纯文本查看 复制代码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
VMProtect Devirtualization Tool - Enhanced Main Module
修复 performance_stats 初始化顺序问题
"""

import logging
import yaml
import json
import argparse
import os
import sys
import traceback
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime

# 导入各模块
from modules.core.trace_parser import TraceParser
from modules.core.vm_detector import VMDetector
from modules.core.cfg_builder import CFGBuilder
from modules.analysis.complexity_analyzer import ComplexityAnalyzer
from modules.analysis.statistical_analyzer import StatisticalAnalyzer
from modules.analysis.pattern_analyzer import VMPatternAnalyzer
from modules.analysis.behavior_analyzer import VMBehaviorAnalyzer
from modules.analysis.version_estimator import VersionEstimator
from modules.execution.symbolic_engine import SymbolicEngine
from modules.execution.op_reconstructor import OpReconstructor
from modules.output.report_generator import ReportGenerator
from modules.output.ida_exporter import IDAExporter
from modules.utils.visualization import VMVisualization
from modules.output.x32dbg_exporter import x32dbgExporter
from modules.integration.lyscript_integration import LyScriptIntegration

class VMProtectDevirtualizer:
    """VMProtect反虚拟化主类 - 修复版"""

    def __init__(self, config_path: str = "config/config.yaml"):
        # 新增:在初始化开始时先设置 performance_stats
        self.performance_stats = {
            'start_time': datetime.now(),
            'module_initialization_time': {},
            'analysis_times': {}
        }

        # 确保配置路径是绝对路径
        if not os.path.isabs(config_path):
            # 获取项目根目录
            project_root = Path(__file__).parent
            self.config_path = project_root / config_path
        else:
            self.config_path = Path(config_path)

        # 先设置基础日志,然后再加载配置
        self._setup_basic_logging()

        # 加载配置
        self.config = self._load_config()

        # 根据配置重新设置日志
        self._setup_logging()

        # 创建必要的目录
        self._create_directories()

        # 初始化组件
        self._initialize_components()

        # 初始化LyScript集成
        self._initialize_lyscript_integration()

    def _initialize_lyscript_integration(self):
        """初始化LyScript集成"""
        try:
            lyscript_config = self.config.get('lyscript_integration', {})
            self.lyscript_integration = LyScriptIntegration(lyscript_config)

            # 记录集成状态
            capabilities = self.lyscript_integration.get_capabilities()
            self.logger.info(f"LyScript集成状态: {capabilities}")

        except Exception as e:
            self.logger.error(f"初始化LyScript集成失败: {e}")
            self.lyscript_integration = None

    def perform_dynamic_analysis(self, target_file: str) -> Dict[str, Any]:
        """
        执行动态分析

        Args:
            target_file: 目标文件路径

        Returns:
            Dict: 动态分析结果
        """
        if not self.lyscript_integration or not self.lyscript_integration.is_available():
            return {"error": "LyScript集成不可用"}

        self.logger.info(f"开始动态分析: {target_file}")
        return self.lyscript_integration.analyze_runtime_behavior(target_file)

    def _setup_basic_logging(self):
        """设置基础日志系统(在配置加载前使用)- 增强版"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[logging.StreamHandler()],
            force=True  # 强制重新配置,避免重复日志
        )
        self.logger = logging.getLogger("VMProtectDevirtualizer")
        self.logger.info("基础日志系统初始化完成")

    def _load_config(self) -> Dict[str, Any]:
        """加载配置文件 - 增强版"""
        try:
            if not self.config_path.exists():
                self.logger.warning(f"配置文件不存在: {self.config_path},使用默认配置")
                return self._get_enhanced_default_config()

            with open(self.config_path, 'r', encoding='utf-8') as f:
                config = yaml.safe_load(f)
                self.logger.info(f"已加载配置文件: {self.config_path}")

                # 新增:配置验证
                self._validate_config(config)

                return config
        except Exception as e:
            self.logger.error(f"加载配置文件失败: {e},使用默认配置")
            return self._get_enhanced_default_config()

    def _get_enhanced_default_config(self) -> Dict[str, Any]:
        """获取增强的默认配置"""
        return {
            'analysis': {
                'mode': 'deep',
                'max_instructions': 50000,
                'enable_advanced_detection': True,
                'timeout': 3600,  # 新增:超时设置
                'enable_progress_tracking': True  # 新增:进度跟踪
            },
            'vm_detection': {
                'confidence_threshold': 0.3,
                'enable_pattern_matching': True,
                'enable_heuristic_analysis': True,
                'enable_ml_detection': False,  # 新增:机器学习检测
                'signature_database': 'data/signatures/vmp_signatures.json'  # 新增:签名数据库
            },
            'symbolic_execution': {
                'max_paths': 100,
                'timeout': 300,
                'enable_constraint_solving': True,
                'max_memory_mb': 4096,  # 新增:内存限制
                'enable_z3_optimizations': True  # 新增:Z3优化
            },
            'logging': {
                'level': 'INFO',
                'file': 'logs/analysis.log',
                'max_file_size_mb': 10,  # 新增:日志文件大小限制
                'backup_count': 5  # 新增:日志备份数量
            },
            'complexity': {
                'enable_entropy_analysis': True,
                'enable_control_flow_analysis': True,
                'enable_cyclomatic_complexity': True  # 新增:圈复杂度分析
            },
            'output': {
                'formats': ['json', 'html', 'txt'],  # 新增:输出格式
                'generate_executive_summary': True,  # 新增:执行摘要
                'export_intermediate_results': False  # 新增:中间结果导出
            },
            # 保留原有配置项
            'trace_parser': {},
            'cfg_builder': {},
            'complexity_analyzer': {},
            'statistical_analyzer': {},
            'pattern_analyzer': {},
            'behavior_analyzer': {},
            'version_estimator': {},
            'op_reconstructor': {},
            'report_generator': {},
            'ida_exporter': {},
            'visualization': {}
        }

    def _validate_config(self, config: Dict[str, Any]):
        """验证配置有效性 - 新增功能"""
        required_sections = ['analysis', 'vm_detection', 'logging']

        for section in required_sections:
            if section not in config:
                self.logger.warning(f"配置缺少必要部分: {section},使用默认值")
                config[section] = self._get_enhanced_default_config()[section]

        # 验证分析模式
        valid_modes = ['standard', 'deep', 'advanced', 'quick']
        analysis_mode = config.get('analysis', {}).get('mode', 'deep')
        if analysis_mode not in valid_modes:
            self.logger.warning(f"无效的分析模式: {analysis_mode},使用默认值 'deep'")
            config['analysis']['mode'] = 'deep'

    def _setup_logging(self):
        """根据配置设置完整的日志系统 - 增强版"""
        log_config = self.config.get('logging', {})
        log_level = log_config.get('level', 'INFO')
        log_file = log_config.get('file', 'logs/analysis.log')
        max_file_size = log_config.get('max_file_size_mb', 10) * 1024 * 1024  # 转换为字节
        backup_count = log_config.get('backup_count', 5)

        # 确保日志文件路径是字符串
        if isinstance(log_file, dict):
            log_file = 'logs/analysis.log'
            self.logger.warning(f"日志文件配置是字典而不是字符串,使用默认值: {log_file}")

        # 创建日志目录
        log_dir = Path(log_file).parent
        log_dir.mkdir(parents=True, exist_ok=True)

        # 清除现有的处理器
        for handler in self.logger.handlers[:]:
            self.logger.removeHandler(handler)

        # 重新配置日志
        self.logger.setLevel(getattr(logging, log_level.upper()))

        # 创建格式化器
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'
        )

        # 添加文件处理器(带轮转)
        try:
            from logging.handlers import RotatingFileHandler
            file_handler = RotatingFileHandler(
                str(log_file),
                maxBytes=max_file_size,
                backupCount=backup_count,
                encoding='utf-8'
            )
            file_handler.setFormatter(formatter)
            self.logger.addHandler(file_handler)
        except Exception as e:
            self.logger.error(f"创建轮转文件日志处理器失败: {e}")
            # 回退到普通文件处理器
            try:
                file_handler = logging.FileHandler(str(log_file), encoding='utf-8')
                file_handler.setFormatter(formatter)
                self.logger.addHandler(file_handler)
            except Exception as e2:
                self.logger.error(f"创建普通文件日志处理器也失败: {e2}")

        # 添加控制台处理器
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(formatter)
        self.logger.addHandler(console_handler)

        self.logger.info(f"日志系统重新配置完成,级别: {log_level}")

    def _create_directories(self):
        """创建必要的目录结构 - 增强版"""
        directories = [
            'logs',
            'output/reports',
            'output/graphs',
            'output/scripts',
            'output/debug_samples',
            'output/temp',  # 新增:临时目录
            'data/signatures',
            'data/patterns',
            'data/training',  # 新增:训练数据目录
            'models',
            'templates',
            'vmp_traces',
            'cache'  # 新增:缓存目录
        ]

        for dir_path in directories:
            try:
                Path(dir_path).mkdir(parents=True, exist_ok=True)
                self.logger.debug(f"确保目录存在: {dir_path}")
            except Exception as e:
                self.logger.warning(f"创建目录失败 {dir_path}: {e}")

    def _initialize_components(self):
        """初始化所有组件 - 修复版"""
        self.logger.info("初始化VMProtect反虚拟化分析组件...")

        try:
            # 核心模块
            start_time = datetime.now()
            self.trace_parser = TraceParser(self.config.get('trace_parser', {}))
            self.vm_detector = VMDetector(self.config.get('vm_detection', {}))
            self.cfg_builder = CFGBuilder(self.config.get('cfg_builder', {}))
            self.performance_stats['module_initialization_time']['core'] = (
                    datetime.now() - start_time
            )

            # 分析模块
            start_time = datetime.now()
            self.complexity_analyzer = ComplexityAnalyzer(self.config.get('complexity_analyzer', {}))
            self.statistical_analyzer = StatisticalAnalyzer(self.config.get('statistical_analyzer', {}))
            self.pattern_analyzer = VMPatternAnalyzer(self.config.get('pattern_analyzer', {}))
            self.behavior_analyzer = VMBehaviorAnalyzer(self.config.get('behavior_analyzer', {}))
            self.version_estimator = VersionEstimator(self.config.get('version_estimator', {}))
            self.performance_stats['module_initialization_time']['analysis'] = (
                    datetime.now() - start_time
            )

            # 执行模块
            start_time = datetime.now()
            self.symbolic_engine = SymbolicEngine(self.config.get('symbolic_execution', {}))
            self.op_reconstructor = OpReconstructor(self.config.get('op_reconstructor', {}))
            self.performance_stats['module_initialization_time']['execution'] = (
                    datetime.now() - start_time
            )

            # 输出模块
            start_time = datetime.now()
            self.report_generator = ReportGenerator(self.config.get('report_generator', {}))
            self.ida_exporter = IDAExporter(self.config.get('ida_exporter', {}))
            self.x32dbg_exporter = x32dbgExporter(self.config.get('x32dbg_exporter', {}))
            self.performance_stats['module_initialization_time']['output'] = (
                    datetime.now() - start_time
            )

            # 工具模块 - 修复可视化工具初始化
            start_time = datetime.now()
            visualization_config = self.config.get('visualization', {})
            # 确保配置是字典并且包含正确的 output_dir
            if not isinstance(visualization_config, dict):
                visualization_config = {}

            # 如果 output_dir 是字典,修复它
            if 'output_dir' in visualization_config and isinstance(visualization_config['output_dir'], dict):
                visualization_config['output_dir'] = 'output/graphs'
                self.logger.warning("修复可视化配置中的 output_dir")

            self.visualizer = VMVisualization(visualization_config)
            self.performance_stats['module_initialization_time']['utils'] = (
                    datetime.now() - start_time
            )

            # 新增:性能监控
            total_init_time = sum(
                (dt.total_seconds() for dt in self.performance_stats['module_initialization_time'].values()),
                0
            )
            self.logger.info(f"组件初始化完成,总耗时: {total_init_time:.2f}秒")

        except Exception as e:
            self.logger.error(f"初始化组件失败: {e}")
            self.logger.error(traceback.format_exc())
            raise

    def analyze_trace(self, trace_file: str, analysis_mode: str = None) -> Dict[str, Any]:
        """分析跟踪文件 - 增强版"""
        try:
            # 使用配置中的分析模式,或使用传入的参数
            analysis_config = self.config.get('analysis', {})
            mode = analysis_mode or analysis_config.get('mode', 'standard')
            max_instructions = analysis_config.get('max_instructions', 50000)
            timeout = analysis_config.get('timeout', 3600)

            # 新增:分析会话标识
            session_id = datetime.now().strftime('%Y%m%d_%H%M%S')
            self.logger.info(f"分析会话ID: {session_id}")

            # 确保跟踪文件路径正确
            trace_file = self._resolve_trace_file_path(trace_file)

            self.logger.info(f"开始分析跟踪文件: {trace_file} (模式: {mode})")
            analysis_results = {
                'session_id': session_id,
                'analysis_mode': mode,
                'trace_file': str(trace_file),
                'start_time': datetime.now().isoformat()
            }

            # 步骤1: 解析跟踪文件
            self.logger.info("步骤1: 解析跟踪文件...")
            instructions = self.trace_parser.parse_trace(trace_file)

            if not instructions:
                self.logger.warning("没有解析到任何指令")
                analysis_results['error'] = "没有解析到任何指令"
                return analysis_results

            # 限制指令数量
            if len(instructions) > max_instructions:
                self.logger.warning(f"指令数量超过限制 ({len(instructions)} > {max_instructions}),进行截断")
                instructions = instructions[:max_instructions]

            analysis_results['instruction_count'] = len(instructions)
            analysis_results['sample_instructions'] = self._safe_sample_instructions(instructions)

            self.logger.info(f"解析到 {len(instructions)} 条指令")

            # 步骤2: 检测VM特征
            self.logger.info("步骤2: 检测VMProtect虚拟化特征...")
            vm_features = self.vm_detector.detect_vm_features(instructions)
            analysis_results['vm_features'] = vm_features

            # 步骤3: 构建控制流图
            self.logger.info("步骤3: 构建控制流图...")
            cfg = self.cfg_builder.build_cfg(instructions)
            analysis_results['control_flow_graph'] = cfg

            # 步骤4: 执行复杂度分析
            self.logger.info("步骤4: 执行复杂度分析...")
            complexity = self.complexity_analyzer.analyze(instructions, cfg)
            analysis_results['complexity'] = complexity

            # 步骤5: 统计分析
            self.logger.info("步骤5: 执行统计分析...")
            statistics = self.statistical_analyzer.analyze(instructions)
            analysis_results['statistics'] = statistics

            # 步骤6: 模式分析
            self.logger.info("步骤6: 执行模式分析...")
            patterns = self.pattern_analyzer.analyze_patterns(instructions, cfg)
            analysis_results['patterns'] = patterns

            # 步骤7: 行为分析
            self.logger.info("步骤7: 执行行为分析...")
            behavior = self.behavior_analyzer.analyze_behavior(instructions, vm_features)
            analysis_results['behavior'] = behavior

            # 步骤8: 版本估算
            self.logger.info("步骤8: 估算VMProtect版本...")
            version_info = self.version_estimator.estimate(analysis_results)
            analysis_results['version_estimation'] = version_info

            # 深度分析模式
            if mode in ["deep", "advanced"]:
                self.logger.info(f"【{mode.upper()}模式】启用增强分析流程...")

                # 符号执行分析
                self.logger.info("→ 执行符号执行分析...")
                symbolic_results = self.symbolic_engine.analyze_symbolically(instructions, cfg)
                analysis_results['symbolic_analysis'] = symbolic_results

                # 操作重建
                self.logger.info("→ 重建原始操作...")
                reconstructed_ops = self.op_reconstructor.reconstruct_operations(instructions, vm_features)
                analysis_results['reconstructed_operations'] = reconstructed_ops

                # 高级模式额外分析
                if mode == "advanced":
                    self.logger.info("→ 执行高级分析...")
                    advanced_results = self._perform_advanced_analysis(instructions, analysis_results)
                    analysis_results['advanced_analysis'] = advanced_results

            # 生成建议
            analysis_results['recommendations'] = self._generate_enhanced_recommendations(analysis_results)

            # 导出样本指令用于调试
            self._export_sample_instructions(instructions, analysis_results)

            # 新增:性能统计
            analysis_results['end_time'] = datetime.now().isoformat()
            analysis_duration = datetime.fromisoformat(analysis_results['end_time']) - datetime.fromisoformat(
                analysis_results['start_time'])
            analysis_results['analysis_duration_seconds'] = analysis_duration.total_seconds()

            self.logger.info(f"分析完成!总耗时: {analysis_duration.total_seconds():.2f}秒")
            return analysis_results

        except Exception as e:
            self.logger.error(f"分析过程中发生错误: {e}")
            self.logger.error(traceback.format_exc())
            analysis_results['error'] = str(e)
            analysis_results['error_traceback'] = traceback.format_exc()
            return analysis_results

    def _resolve_trace_file_path(self, trace_file: str) -> Path:
        """解析跟踪文件路径 - 新增功能"""
        trace_path = Path(trace_file)

        # 如果是绝对路径且存在
        if trace_path.is_absolute() and trace_path.exists():
            return trace_path

        # 尝试相对路径
        if trace_path.exists():
            return trace_path

        # 尝试在 vmp_traces 目录中查找
        vmp_traces_path = Path("vmp_traces") / trace_file
        if vmp_traces_path.exists():
            return vmp_traces_path

        # 尝试其他可能的位置
        possible_locations = [
            Path("vmp_traces") / trace_path.name,
            Path("data") / trace_path.name,
            Path(".") / trace_path.name
        ]

        for location in possible_locations:
            if location.exists():
                return location

        raise FileNotFoundError(f"找不到跟踪文件: {trace_file}")

    def export_to_x32dbg(self, analysis_results: Dict, output_path: str = None) -> bool:
        """导出到x32dbg脚本 - 新增功能"""
        try:
            if output_path is None:
                # 生成默认路径
                session_id = analysis_results.get('session_id', datetime.now().strftime('%Y%m%d_%H%M%S'))
                output_path = f"output/scripts/x32dbg_analysis_{session_id}.py"

            self.logger.info(f"导出x32dbg脚本到: {output_path}")

            # 确保输出目录存在
            Path(output_path).parent.mkdir(parents=True, exist_ok=True)

            success = self.x32dbg_exporter.export_analysis(analysis_results, output_path)

            if success:
                self.logger.info(f"x32dbg脚本导出成功: {output_path}")

                # 同时导出x64dbg格式
                x64dbg_path = output_path.replace('.py', '_x64dbg.txt')
                self.x32dbg_exporter.export_x64dbg_script(analysis_results, x64dbg_path)

                return True
            else:
                self.logger.error("x32dbg导出器返回失败状态")
                return False

        except Exception as e:
            self.logger.error(f"导出x32dbg脚本失败: {e}")
            self.logger.error(traceback.format_exc())
            return False

    def _safe_sample_instructions(self, instructions: List[Dict], sample_size: int = 10) -> List[Dict]:
        """安全地采样指令,处理序列化问题 - 新增功能"""
        sample = []
        for instr in instructions[:sample_size]:
            try:
                # 确保指令数据可序列化
                safe_instr = {}
                for key, value in instr.items():
                    if isinstance(value, (str, int, float, bool, type(None))):
                        safe_instr[key] = value
                    elif isinstance(value, bytes):
                        safe_instr[key] = value.hex()  # 将bytes转换为hex字符串
                    else:
                        safe_instr[key] = str(value)  # 其他类型转换为字符串
                sample.append(safe_instr)
            except Exception as e:
                self.logger.warning(f"处理指令样本时出错: {e}")
                continue
        return sample

    def _perform_advanced_analysis(self, instructions: List[Dict], analysis_results: Dict) -> Dict[str, Any]:
        """执行高级分析 - 新增功能"""
        advanced_results = {}

        try:
            # 1. 交叉引用分析
            self.logger.info("  - 执行交叉引用分析...")
            xref_analysis = self._analyze_cross_references(instructions)
            advanced_results['cross_references'] = xref_analysis

            # 2. 数据流分析
            self.logger.info("  - 执行数据流分析...")
            data_flow_analysis = self._analyze_data_flow(instructions)
            advanced_results['data_flow'] = data_flow_analysis

            # 3. 调用图分析
            self.logger.info("  - 构建调用图...")
            call_graph = self._build_call_graph(instructions)
            advanced_results['call_graph'] = call_graph

            # 4. 模式匹配增强
            self.logger.info("  - 执行增强模式匹配...")
            enhanced_patterns = self._enhanced_pattern_matching(instructions, analysis_results)
            advanced_results['enhanced_patterns'] = enhanced_patterns

        except Exception as e:
            self.logger.warning(f"高级分析部分失败: {e}")
            advanced_results['error'] = str(e)

        return advanced_results

    def _analyze_cross_references(self, instructions: List[Dict]) -> Dict[str, Any]:
        """分析交叉引用 - 新增功能"""
        # 简化的交叉引用分析实现
        xrefs = {
            'jump_references': [],
            'call_references': [],
            'data_references': []
        }

        for i, instr in enumerate(instructions):
            # if i >= 1000:  # 限制分析数量
            #     break

            # 这里可以实现实际的交叉引用分析逻辑
            # 目前返回简化结果
            pass

        return xrefs

    def _analyze_data_flow(self, instructions: List[Dict]) -> Dict[str, Any]:
        """数据流分析 - 新增功能"""
        return {
            'status': 'simplified_implementation',
            'message': '完整的数据流分析需要更复杂的实现'
        }

    def _build_call_graph(self, instructions: List[Dict]) -> Dict[str, Any]:
        """构建调用图 - 新增功能"""
        return {
            'status': 'simplified_implementation',
            'message': '完整的调用图构建需要更复杂的实现'
        }

    def _enhanced_pattern_matching(self, instructions: List[Dict], analysis_results: Dict) -> Dict[str, Any]:
        """增强模式匹配 - 新增功能"""
        return {
            'status': 'simplified_implementation',
            'message': '增强模式匹配需要额外的模式数据库'
        }

    def _generate_enhanced_recommendations(self, analysis_results: Dict) -> List[str]:
        """生成增强的分析建议 - 改进版"""
        recommendations = []

        try:
            complexity = analysis_results.get('complexity', {})
            vm_features = analysis_results.get('vm_features', {})
            patterns = analysis_results.get('patterns', [])
            version_info = analysis_results.get('version_estimation', {})
            mode = analysis_results.get('analysis_mode', 'standard')

            # 处理保护级别
            protection_level_data = complexity.get('protection_level', {})
            if isinstance(protection_level_data, dict):
                level = protection_level_data.get('level', 'medium')
            else:
                level = str(protection_level_data)

            level_lower = level.lower()

            # 基于保护级别生成建议
            protection_recommendations = {
                'high': [
                    "检测到高强度保护,建议使用深度符号执行模式",
                    "考虑使用多路径分析方法处理复杂控制流",
                    "可能需要手动分析关键handler函数",
                    "建议分阶段分析,先识别关键组件"
                ],
                'very_high': [
                    "检测到极高强度保护,需要专业级分析工具",
                    "建议结合动态分析和静态分析",
                    "考虑使用硬件辅助分析技术",
                    "可能需要专家级逆向工程技能"
                ],
                'medium': [
                    "中等保护级别,标准分析方法可能有效",
                    "建议结合动态分析验证结果",
                    "可以尝试自动化重建技术"
                ],
                'low': [
                    "低保护级别,可以尝试快速分析方法",
                    "自动化工具可能获得较好效果",
                    "适合初学者学习VM分析技术"
                ]
            }

            recommendations.extend(protection_recommendations.get(level_lower, [
                "根据复杂度分析调整分析策略"
            ]))

            # 基于VM特征生成建议
            if vm_features.get('has_anti_debug', False):
                recommendations.append("检测到反调试技术,建议在隔离环境中分析")
                recommendations.append("考虑使用虚拟机或沙箱环境")

            if vm_features.get('has_obfuscated_stack', False):
                recommendations.append("检测到堆栈混淆,需要详细的堆栈跟踪分析")
                recommendations.append("建议使用符号执行跟踪堆栈操作")

            if vm_features.get('has_virtual_registers', False):
                recommendations.append("检测到虚拟寄存器,需要寄存器映射分析")
                recommendations.append("建议分析寄存器分配模式")

            # 基于版本信息生成建议
            estimated_version = version_info.get('estimated_version', 'unknown')
            if estimated_version != 'unknown':
                recommendations.append(f"检测到 {estimated_version},使用对应的分析技术")

            # 基于分析模式生成建议
            if mode == 'standard':
                recommendations.append("当前使用标准模式,可尝试深度模式获得更详细分析")
            elif mode == 'deep':
                recommendations.append("深度模式已启用,分析结果较为详细")
            elif mode == 'advanced':
                recommendations.append("高级模式已启用,包含额外分析功能")

            # 基于检测到的模式生成建议
            detected_patterns = set(patterns)

            pattern_recommendations = {
                'virtual_machine_enter': "已识别VM入口点,可以开始指令重建",
                'control_flow_flattening': "检测到控制流平坦化,需要特殊处理CFG重建",
                'opaque_predicates': "检测到不透明谓词,需要符号执行破解",
                'instruction_splitting': "检测到指令分割,需要指令重组分析",
                'virtual_stack': "检测到虚拟栈操作,需要栈跟踪分析",
                'multiple_dispatchers': "检测到多分发器,需要分别分析每个分发器"
            }

            for pattern, recommendation in pattern_recommendations.items():
                if pattern in detected_patterns:
                    recommendations.append(recommendation)

            # 基于复杂度分数生成建议
            complexity_score = complexity.get('complexity_score', 0)
            if complexity_score > 80:
                recommendations.append("高复杂度代码,建议分阶段分析")
                recommendations.append("优先分析关键路径和核心功能")
            elif complexity_score > 60:
                recommendations.append("中等复杂度,系统分析方法适用")
            elif complexity_score < 30:
                recommendations.append("低复杂度,可以快速完成分析")

            # 去重并限制数量
            unique_recommendations = list(dict.fromkeys(recommendations))
            return unique_recommendations[:15]  # 限制建议数量

        except Exception as e:
            self.logger.error(f"生成建议时发生错误: {e}")
            return ["分析完成,但生成建议时遇到问题 - 请查看详细日志"]

    def _export_sample_instructions(self, instructions: List[Dict], analysis_results: Dict):
        """导出样本指令用于调试 - 增强版"""
        try:
            sample_dir = Path("output/debug_samples")
            sample_dir.mkdir(parents=True, exist_ok=True)

            sample_data = {
                'timestamp': datetime.now().isoformat(),
                'session_id': analysis_results.get('session_id', 'unknown'),
                'total_instructions': len(instructions),
                'sample_instructions': self._safe_sample_instructions(instructions, 50),
                'analysis_summary': {
                    'vm_confidence': analysis_results.get('vm_features', {}).get('vm_confidence_score', 0),
                    'complexity_score': analysis_results.get('complexity', {}).get('complexity_score', 0),
                    'pattern_count': len(analysis_results.get('patterns', [])),
                    'estimated_version': analysis_results.get('version_estimation', {}).get('estimated_version',
                                                                                            'unknown')
                },
                'performance': {
                    'instruction_count': len(instructions),
                    'analysis_mode': analysis_results.get('analysis_mode', 'standard')
                }
            }

            sample_file = sample_dir / f"sample_analysis_{analysis_results.get('session_id', 'unknown')}.json"
            with open(sample_file, 'w', encoding='utf-8') as f:
                json.dump(sample_data, f, indent=2, ensure_ascii=False, default=str)

            self.logger.info(f"已导出分析样本到: {sample_file}")

        except Exception as e:
            self.logger.warning(f"导出样本指令失败: {e}")

    def generate_report(self, analysis_results: Dict, output_format: str = "html") -> str:
        """生成分析报告 - 增强版"""
        try:
            self.logger.info(f"生成{output_format.upper()}格式报告...")

            # 支持多种输出格式
            output_config = self.config.get('output', {})
            formats = output_config.get('formats', ['json', 'html', 'txt'])

            if output_format not in formats:
                self.logger.warning(f"请求的输出格式 {output_format} 不在配置中,使用HTML格式")
                output_format = 'html'

            report_path = self.report_generator.generate_report(analysis_results, output_format)

            if report_path:
                self.logger.info(f"报告生成成功: {report_path}")

                # 生成执行摘要(如果启用)
                if output_config.get('generate_executive_summary', True):
                    self._generate_executive_summary(analysis_results)

                return report_path
            else:
                self.logger.error("报告生成器返回空路径")
                return ""

        except Exception as e:
            self.logger.error(f"生成报告失败: {e}")
            self.logger.error(traceback.format_exc())
            return ""

    def _generate_executive_summary(self, analysis_results: Dict):
        """生成执行摘要 - 新增功能"""
        try:
            summary_file = Path(
                "output/reports") / f"executive_summary_{analysis_results.get('session_id', 'unknown')}.txt"

            with open(summary_file, 'w', encoding='utf-8') as f:
                f.write("=" * 60 + "\n")
                f.write("VMProtect分析执行摘要\n")
                f.write("=" * 60 + "\n\n")

                # 基本信息
                f.write(f"分析会话: {analysis_results.get('session_id', 'N/A')}\n")
                f.write(f"分析文件: {analysis_results.get('trace_file', 'N/A')}\n")
                f.write(f"指令数量: {analysis_results.get('instruction_count', 0)}\n")
                f.write(f"分析模式: {analysis_results.get('analysis_mode', 'N/A')}\n")
                f.write(f"分析时长: {analysis_results.get('analysis_duration_seconds', 0):.2f}秒\n\n")

                # 关键发现
                f.write("关键发现:\n")
                f.write("-" * 40 + "\n")

                vm_confidence = analysis_results.get('vm_features', {}).get('vm_confidence_score', 0)
                f.write(f"VM检测置信度: {vm_confidence:.2f}/10.0\n")

                complexity_score = analysis_results.get('complexity', {}).get('complexity_score', 0)
                f.write(f"代码复杂度: {complexity_score:.2f}/10.0\n")

                version = analysis_results.get('version_estimation', {}).get('estimated_version', 'unknown')
                f.write(f"估计版本: {version}\n")

                pattern_count = len(analysis_results.get('patterns', []))
                f.write(f"检测模式: {pattern_count}种\n\n")

                # 建议摘要
                recommendations = analysis_results.get('recommendations', [])
                if recommendations:
                    f.write("主要建议:\n")
                    f.write("-" * 40 + "\n")
                    for i, rec in enumerate(recommendations[:5], 1):
                        f.write(f"{i}. {rec}\n")

            self.logger.info(f"执行摘要已生成: {summary_file}")

        except Exception as e:
            self.logger.warning(f"生成执行摘要失败: {e}")

    def export_to_ida(self, analysis_results: Dict, output_path: str = None) -> bool:
        """导出到IDA脚本 - 增强版"""
        try:
            if output_path is None:
                # 生成默认路径
                session_id = analysis_results.get('session_id', datetime.now().strftime('%Y%m%d_%H%M%S'))
                output_path = f"output/scripts/ida_analysis_{session_id}.py"

            self.logger.info(f"导出IDA脚本到: {output_path}")

            # 确保输出目录存在
            Path(output_path).parent.mkdir(parents=True, exist_ok=True)

            success = self.ida_exporter.export_analysis(analysis_results, output_path)

            if success:
                self.logger.info(f"IDA脚本导出成功: {output_path}")
                return True
            else:
                self.logger.error("IDA导出器返回失败状态")
                return False

        except Exception as e:
            self.logger.error(f"导出IDA脚本失败: {e}")
            self.logger.error(traceback.format_exc())
            return False

    def visualize_results(self, analysis_results: Dict) -> None:
        """可视化分析结果 - 增强版"""
        try:
            self.logger.info("生成可视化结果...")

            # 检查是否有足够的数据进行可视化
            if analysis_results.get('instruction_count', 0) == 0:
                self.logger.warning("没有指令数据,跳过可视化")
                return

            # 1. 生成基础可视化报告
            self.visualizer.export_visualization_report(analysis_results)

            # 2. 生成交互式仪表板
            self.logger.info("生成交互式仪表板...")
            self.visualizer.create_interactive_dashboard(
                analysis_results,
                filename="interactive_dashboard.html"
            )

            # 3. 生成分析仪表盘
            self.logger.info("生成分析仪表盘...")
            dashboard_path = self.visualizer.create_analysis_dashboard(analysis_results)
            if dashboard_path:
                self.logger.info(f"分析仪表盘已生成: {dashboard_path}")

            # 4. 生成专业分析图表
            self.logger.info("生成专业分析图表...")

            # 控制流图
            cfg_data = analysis_results.get('control_flow_graph', {})
            if cfg_data:
                self.visualizer.plot_control_flow_graph(
                    cfg_data,
                    title="VM控制流分析",
                    filename="detailed_control_flow.png"
                )

            # 指令分析
            instructions = analysis_results.get('instructions', [])
            if not instructions:
                instructions = analysis_results.get('sample_instructions', [])
            if instructions:
                self.visualizer.plot_instruction_analysis(
                    instructions,
                    filename="detailed_instruction_analysis.png"
                )

            # VM架构图
            vm_data = analysis_results.get('vm_analysis', {})
            if not vm_data:
                vm_data = analysis_results.get('vm_features', {})
            if vm_data:
                self.visualizer.plot_vm_architecture(
                    vm_data,
                    filename="vm_architecture_detailed.png"
                )

            # 5. 生成HTML报告(包含图表)
            self.logger.info("生成带图表的HTML报告...")
            # 使用可视化器的输出目录
            html_report_path = self.visualizer.output_dir / "visualization_report.html"
            self.visualizer.generate_html_report_with_plots(
                analysis_results,
                html_report_path
            )

            self.logger.info("增强可视化报告生成完成")

            # 显示生成的文件
            visualization_files = list(self.visualizer.output_dir.glob("*"))
            if visualization_files:
                self.logger.info("生成的可视化文件:")
                for file in visualization_files:
                    self.logger.info(f"  - {file.name}")

        except Exception as e:
            self.logger.error(f"可视化结果失败: {e}")
            self.logger.error(traceback.format_exc())

    def get_performance_stats(self) -> Dict[str, Any]:
        """获取性能统计 - 新增功能"""
        total_time = datetime.now() - self.performance_stats['start_time']

        stats = {
            'total_uptime_seconds': total_time.total_seconds(),
            'module_initialization_times': {},
            'analysis_times': self.performance_stats.get('analysis_times', {}),
            'memory_usage_mb': self._get_memory_usage()
        }

        # 转换时间为秒
        for module, time_delta in self.performance_stats['module_initialization_time'].items():
            stats['module_initialization_times'][module] = time_delta.total_seconds()

        return stats

    def _get_memory_usage(self) -> float:
        """获取内存使用情况 - 新增功能"""
        try:
            import psutil
            process = psutil.Process()
            return process.memory_info().rss / 1024 / 1024  # 转换为MB
        except ImportError:
            return 0.0
        except Exception:
            return 0.0

    def cleanup(self):
        """清理资源 - 新增功能"""
        self.logger.info("执行清理操作...")

        # 清理临时文件
        temp_dir = Path("output/temp")
        if temp_dir.exists():
            try:
                for temp_file in temp_dir.glob("*"):
                    try:
                        temp_file.unlink()
                    except:
                        pass
                self.logger.info("临时文件清理完成")
            except Exception as e:
                self.logger.warning(f"清理临时文件失败: {e}")


def main():
    """主函数 - 增强版"""
    parser = argparse.ArgumentParser(
        description='VMProtect Devirtualization Tool - Enhanced Version',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
示例:
  python main.py -t sample.vmp.trace -m deep
  python main.py --config my_config.yaml --trace /path/to/trace.file
  python main.py --mode advanced --trace vmp_traces/sample1.vmp.trace
        """
    )

    parser.add_argument('--config', '-c', default='config/config.yaml',
                        help='配置文件路径 (默认: config/config.yaml)')
    parser.add_argument('--trace', '-t', default='vmp_traces/sample1.vmp.trace',
                        help='跟踪文件路径 (默认: vmp_traces/sample1.vmp.trace)')
    parser.add_argument('--mode', '-m', choices=['standard', 'deep', 'advanced', 'quick'],
                        help='分析模式 (覆盖配置文件中的设置)')
    parser.add_argument('--output-dir', '-o',
                        help='输出目录 (覆盖配置中的设置)')
    parser.add_argument('--verbose', '-v', action='store_true',
                        help='详细输出模式')
    parser.add_argument('--benchmark', action='store_true',
                        help='启用性能基准测试')

    args = parser.parse_args()

    try:
        # 初始化分析器
        analyzer = VMProtectDevirtualizer(args.config)

        # 设置详细日志
        if args.verbose:
            analyzer.logger.setLevel(logging.DEBUG)
            for handler in analyzer.logger.handlers:
                handler.setLevel(logging.DEBUG)

        # 分析跟踪文件
        analysis_results = analyzer.analyze_trace(args.trace, analysis_mode=args.mode)

        # 检查分析是否成功
        if 'error' in analysis_results:
            print(f"分析过程中出现错误: {analysis_results['error']}")
            if args.verbose:
                print(f"详细错误: {analysis_results.get('error_traceback', '无')}")
            return 1

        # 生成报告
        report_path = analyzer.generate_report(analysis_results, "html")
        if report_path:
            print(f"&#10003; 分析报告已生成: {report_path}")
        else:
            print("&#10007; 生成报告失败")

        # 可视化结果
        analyzer.visualize_results(analysis_results)
        print("&#10003; 可视化结果生成完成")

        # 导出x32dbg脚本 - 新增
        x32dbg_script_path = "output/scripts/x32dbg_analysis.py"
        if analyzer.export_to_x32dbg(analysis_results, x32dbg_script_path):
            print(f"&#10003; x32dbg脚本已导出: {x32dbg_script_path}")
        else:
            print("&#10007; 导出x32dbg脚本失败")

        # 导出IDA脚本
        ida_script_path = "output/scripts/ida_analysis.py"
        if analyzer.export_to_ida(analysis_results, ida_script_path):
            print(f"&#10003; IDA脚本已导出: {ida_script_path}")
        else:
            print("&#10007; 导出IDA脚本失败")

        # 性能统计
        if args.benchmark:
            stats = analyzer.get_performance_stats()
            print(f"\n性能统计:")
            print(f"  总运行时间: {stats['total_uptime_seconds']:.2f}秒")
            print(f"  内存使用: {stats['memory_usage_mb']:.2f} MB")
            print(f"  分析耗时: {analysis_results.get('analysis_duration_seconds', 0):.2f}秒")

        # 显示关键结果
        print(f"\n关键分析结果:")
        print(f"  指令数量: {analysis_results.get('instruction_count', 0)}")
        vm_confidence = analysis_results.get('vm_features', {}).get('vm_confidence_score', 0)
        print(f"  VM检测置信度: {vm_confidence:.2f}/10.0")
        complexity = analysis_results.get('complexity', {}).get('complexity_score', 0)
        print(f"  代码复杂度: {complexity:.2f}/10.0")
        version = analysis_results.get('version_estimation', {}).get('estimated_version', 'unknown')
        print(f"  估计版本: {version}")

        # 清理资源
        analyzer.cleanup()

        return 0

    except Exception as e:
        print(f"分析失败: {e}")
        if args.verbose:
            traceback.print_exc()
        return 1


if __name__ == "__main__":
    exit(main())


几个核心模块:
1、cfg_builder.py
[Asm] 纯文本查看 复制代码
import logging
from typing import Dict, List, Any, Set, Tuple, Optional
from collections import defaultdict, deque


class CFGBuilder:
    """控制流图构建器"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.logger = logging.getLogger(__name__)

        # CFG数据结构
        self.nodes = {}  # 基本块节点
        self.edges = []  # 控制流边
        self.entry_points = []  # 入口点

    def build_cfg(self, instructions: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        构建控制流图 - 兼容性方法

        Args:
            instructions: 指令列表

        Returns:
            Dict: CFG分析结果
        """
        # 使用空的VM特征进行构建
        vm_features = {}
        return self.build_from_trace(instructions, vm_features)

    def build_from_trace(self, instructions: List[Dict[str, Any]],
                         vm_features: Dict[str, Any]) -> Dict[str, Any]:
        """
        从跟踪数据构建控制流图

        Args:
            instructions: 指令列表
            vm_features: VM特征检测结果

        Returns:
            Dict: CFG分析结果
        """
        self.logger.info("构建VM控制流图...")

        if not instructions:
            return {'error': 'No instructions provided'}

        try:
            # 识别基本块
            basic_blocks = self._identify_basic_blocks(instructions)

            # 构建CFG节点
            self._build_cfg_nodes(basic_blocks)

            # 构建CFG边
            self._build_cfg_edges(basic_blocks, instructions)

            # 识别VM特定结构
            vm_structures = self._identify_vm_structures(basic_blocks, vm_features)

            # 分析CFG特征
            cfg_analysis = self._analyze_cfg_features()

            result = {
                'nodes': self.nodes,
                'edges': self.edges,
                'entry_points': self.entry_points,
                'vm_structures': vm_structures,
                'analysis': cfg_analysis,
                'basic_block_count': len(basic_blocks),
                'edge_count': len(self.edges)
            }

            self.logger.info(f"CFG构建完成: {len(basic_blocks)} 个基本块, {len(self.edges)} 条边")
            return result

        except Exception as e:
            self.logger.error(f"CFG构建失败: {e}")
            return {'error': str(e)}

    def _identify_basic_blocks(self, instructions: List[Dict[str, Any]]) -> List[List[Dict[str, Any]]]:
        """识别基本块"""
        basic_blocks = []
        current_block = []

        for i, instr in enumerate(instructions):
            # 基本块开始
            if not current_block:
                current_block.append(instr)
            else:
                # 控制流指令结束当前块
                if self._is_control_flow_instruction(instr):
                    current_block.append(instr)
                    basic_blocks.append(current_block)
                    current_block = []
                else:
                    current_block.append(instr)

        # 添加最后一个块
        if current_block:
            basic_blocks.append(current_block)

        return basic_blocks

    def _build_cfg_nodes(self, basic_blocks: List[List[Dict[str, Any]]]):
        """构建CFG节点"""
        for i, block in enumerate(basic_blocks):
            if not block:
                continue

            start_addr = block[0].get('address', 0)
            end_addr = block[-1].get('address', 0)

            self.nodes[i] = {
                'id': i,
                'start_address': start_addr,
                'start_address_hex': f"0x{start_addr:x}",
                'end_address': end_addr,
                'end_address_hex': f"0x{end_addr:x}",
                'instruction_count': len(block),
                'instructions': block,
                'type': self._classify_block_type(block),
                'vm_indicators': self._detect_block_vm_indicators(block)
            }

            # 识别入口点
            if i == 0 or self._is_entry_block(block):
                self.entry_points.append(i)

    def _build_cfg_edges(self, basic_blocks: List[List[Dict[str, Any]]],
                         instructions: List[Dict[str, Any]]):
        """构建CFG边"""
        for i, block in enumerate(basic_blocks):
            if not block:
                continue

            last_instr = block[-1]

            # 顺序执行边
            if i < len(basic_blocks) - 1 and not self._is_unconditional_branch(last_instr):
                self.edges.append({
                    'from': i,
                    'to': i + 1,
                    'type': 'sequential',
                    'condition': 'fallthrough'
                })

            # 控制流边
            if self._is_control_flow_instruction(last_instr):
                target_blocks = self._find_branch_targets(last_instr, basic_blocks, instructions)
                for target in target_blocks:
                    edge_type = 'conditional' if self._is_conditional_branch(last_instr) else 'unconditional'
                    self.edges.append({
                        'from': i,
                        'to': target,
                        'type': edge_type,
                        'condition': self._get_branch_condition(last_instr)
                    })

    def _identify_vm_structures(self, basic_blocks: List[List[Dict[str, Any]]],
                                vm_features: Dict[str, Any]) -> Dict[str, Any]:
        """识别VM特定结构"""
        structures = {
            'dispatcher_blocks': [],
            'handler_blocks': [],
            'vm_entry_blocks': [],
            'vm_exit_blocks': []
        }

        for block_id, block_data in self.nodes.items():
            block = block_data['instructions']

            # 识别分发器块
            if self._is_dispatcher_block(block):
                structures['dispatcher_blocks'].append({
                    'block_id': block_id,
                    'address': block_data['start_address_hex'],
                    'evidence': 'indirect_branches'
                })

            # 识别处理器块
            elif self._is_handler_block(block):
                structures['handler_blocks'].append({
                    'block_id': block_id,
                    'address': block_data['start_address_hex'],
                    'instruction_count': len(block),
                    'vm_instruction_ratio': self._calculate_vm_instruction_ratio(block)
                })

            # 识别VM入口块
            elif self._is_vm_entry_block(block):
                structures['vm_entry_blocks'].append({
                    'block_id': block_id,
                    'address': block_data['start_address_hex']
                })

            # 识别VM出口块
            elif self._is_vm_exit_block(block):
                structures['vm_exit_blocks'].append({
                    'block_id': block_id,
                    'address': block_data['start_address_hex']
                })

        return structures

    def _analyze_cfg_features(self) -> Dict[str, Any]:
        """分析CFG特征"""
        analysis = {
            'connectivity': self._calculate_connectivity(),
            'cyclomatic_complexity': self._calculate_cyclomatic_complexity(),
            'average_degree': self._calculate_average_degree(),
            'strongly_connected_components': 0,  # 简化实现
            'depth': self._calculate_cfg_depth()
        }

        return analysis

    def _is_control_flow_instruction(self, instruction: Dict[str, Any]) -> bool:
        """检查是否为控制流指令"""
        opcode = instruction.get('opcode', '').upper()
        control_flow_ops = ['JMP', 'CALL', 'RET', 'JE', 'JNE', 'JZ', 'JNZ', 'JA', 'JB', 'JG', 'JL']
        return opcode in control_flow_ops

    def _is_conditional_branch(self, instruction: Dict[str, Any]) -> bool:
        """检查是否为条件分支"""
        opcode = instruction.get('opcode', '').upper()
        return opcode in ['JE', 'JNE', 'JZ', 'JNZ', 'JA', 'JB', 'JG', 'JL']

    def _is_unconditional_branch(self, instruction: Dict[str, Any]) -> bool:
        """检查是否为无条件分支"""
        opcode = instruction.get('opcode', '').upper()
        return opcode in ['JMP', 'CALL', 'RET']

    def _classify_block_type(self, block: List[Dict[str, Any]]) -> str:
        """分类基本块类型"""
        if not block:
            return 'empty'

        last_instr = block[-1]

        if self._is_control_flow_instruction(last_instr):
            if self._is_conditional_branch(last_instr):
                return 'conditional_branch'
            elif self._is_unconditional_branch(last_instr):
                return 'unconditional_branch'

        return 'sequential'

    def _detect_block_vm_indicators(self, block: List[Dict[str, Any]]) -> List[str]:
        """检测块的VM指示器"""
        indicators = []

        for instr in block:
            opcode = instr.get('opcode', '').upper()
            operands = instr.get('operands', '').upper()

            # VM相关指令
            if opcode in ['VMCALL', 'VMLAUNCH', 'VMRESUME']:
                indicators.append('vm_instruction')

            # 栈操作模式
            if opcode in ['PUSH', 'POP'] and any(reg in operands for reg in ['EBP', 'ESP']):
                indicators.append('stack_operation')

            # 间接跳转
            if opcode == 'JMP' and '[' in operands:
                indicators.append('indirect_jump')

        return list(set(indicators))

    def _is_entry_block(self, block: List[Dict[str, Any]]) -> bool:
        """检查是否为入口块"""
        # 简化的入口块检测
        if len(block) < 3:
            return False

        # 检查典型的VM入口模式
        first_instrs = [instr.get('opcode', '').upper() for instr in block[:3]]
        entry_patterns = [
            ['PUSH', 'MOV', 'SUB'],  # 栈设置
            ['PUSHAD', 'MOV', 'SUB'],  # 寄存器保存
            ['PUSHFD', 'PUSHAD', 'MOV']  # 标志保存
        ]

        for pattern in entry_patterns:
            if first_instrs[:len(pattern)] == pattern:
                return True

        return False

    def _find_branch_targets(self, branch_instr: Dict[str, Any],
                             basic_blocks: List[List[Dict[str, Any]]],
                             instructions: List[Dict[str, Any]]) -> List[int]:
        """查找分支目标块"""
        targets = []

        # 简化的目标查找
        # 在实际实现中,需要分析分支指令的操作数来确定目标地址

        # 这里返回下一个块作为占位符
        current_block_index = self._find_block_containing_instruction(branch_instr, basic_blocks)
        if current_block_index is not None and current_block_index < len(basic_blocks) - 1:
            targets.append(current_block_index + 1)

        return targets

    def _find_block_containing_instruction(self, instruction: Dict[str, Any],
                                           basic_blocks: List[List[Dict[str, Any]]]) -> Optional[int]:
        """查找包含指定指令的块"""
        for i, block in enumerate(basic_blocks):
            for instr in block:
                if instr.get('address') == instruction.get('address'):
                    return i
        return None

    def _get_branch_condition(self, branch_instr: Dict[str, Any]) -> str:
        """获取分支条件"""
        opcode = branch_instr.get('opcode', '').upper()

        condition_map = {
            'JE': 'equal',
            'JNE': 'not_equal',
            'JG': 'greater',
            'JL': 'less',
            'JA': 'above',
            'JB': 'below',
            'JMP': 'always',
            'CALL': 'call',
            'RET': 'return'
        }

        return condition_map.get(opcode, 'unknown')

    def _is_dispatcher_block(self, block: List[Dict[str, Any]]) -> bool:
        """检查是否为分发器块"""
        if not block:
            return False

        # 检查间接跳转模式
        indirect_branches = 0
        for instr in block:
            opcode = instr.get('opcode', '').upper()
            operands = instr.get('operands', '').upper()
            if opcode == 'JMP' and '[' in operands:
                indirect_branches += 1

        return indirect_branches > 0

    def _is_handler_block(self, block: List[Dict[str, Any]]) -> bool:
        """检查是否为处理器块"""
        if len(block) < 3:
            return False

        # 检查VM指令比例
        vm_ratio = self._calculate_vm_instruction_ratio(block)
        return vm_ratio > 0.3

    def _is_vm_entry_block(self, block: List[Dict[str, Any]]) -> bool:
        """检查是否为VM入口块"""
        # 简化的入口检测
        if len(block) < 2:
            return False

        first_instr = block[0].get('opcode', '').upper()

        # 检查典型的入口指令
        entry_indicators = ['PUSHAD', 'PUSHFD', 'ENTER']
        return first_instr in entry_indicators

    def _is_vm_exit_block(self, block: List[Dict[str, Any]]) -> bool:
        """检查是否为VM出口块"""
        if not block:
            return False

        last_instr = block[-1].get('opcode', '').upper()

        # 检查典型的出口指令
        exit_indicators = ['POPAD', 'POPFD', 'LEAVE', 'RET']
        return last_instr in exit_indicators

    def _calculate_vm_instruction_ratio(self, block: List[Dict[str, Any]]) -> float:
        """计算VM指令比例"""
        if not block:
            return 0.0

        vm_count = 0
        for instr in block:
            indicators = self._detect_block_vm_indicators([instr])
            if indicators:
                vm_count += 1

        return vm_count / len(block)

    def _calculate_connectivity(self) -> float:
        """计算连通性"""
        if not self.nodes:
            return 0.0

        max_edges = len(self.nodes) * (len(self.nodes) - 1)
        if max_edges == 0:
            return 0.0

        return len(self.edges) / max_edges

    def _calculate_cyclomatic_complexity(self) -> int:
        """计算圈复杂度"""
        # V(G) = E - N + 2P
        # 其中 E = 边数, N = 节点数, P = 连通分量数(这里假设为1)

        E = len(self.edges)
        N = len(self.nodes)
        P = 1  # 假设单个连通分量

        return E - N + 2 * P

    def _calculate_average_degree(self) -> float:
        """计算平均度数"""
        if not self.nodes:
            return 0.0

        in_degree = defaultdict(int)
        out_degree = defaultdict(int)

        for edge in self.edges:
            out_degree[edge['from']] += 1
            in_degree[edge['to']] += 1

        total_degree = sum(in_degree.values()) + sum(out_degree.values())
        return total_degree / len(self.nodes)

    def _calculate_cfg_depth(self) -> int:
        """计算CFG深度"""
        if not self.entry_points:
            return 0

        # 简化的深度计算
        visited = set()
        max_depth = 0

        for entry in self.entry_points:
            depth = self._bfs_depth(entry, visited)
            max_depth = max(max_depth, depth)

        return max_depth

    def _bfs_depth(self, start_node: int, visited: Set[int]) -> int:
        """BFS计算深度"""
        queue = deque([(start_node, 0)])
        max_depth = 0

        while queue:
            node, depth = queue.popleft()

            if node in visited:
                continue

            visited.add(node)
            max_depth = max(max_depth, depth)

            # 添加后继节点
            for edge in self.edges:
                if edge['from'] == node:
                    queue.append((edge['to'], depth + 1))

        return max_depth

    def to_dot(self) -> str:
        """生成DOT格式的CFG"""
        dot_lines = ["digraph CFG {"]
        dot_lines.append("  rankdir=TB;")  # 从上到下布局
        dot_lines.append("  node [shape=rectangle, style=filled, fillcolor=lightblue];")

        # 添加节点
        for node_id, node_data in self.nodes.items():
            label = f"BB{node_id}\\n{node_data['start_address_hex']}\\n{node_data['instruction_count']} instr"

            # 根据节点类型设置颜色
            if node_id in self.entry_points:
                fillcolor = "lightgreen"  # 入口点
            elif node_data['type'] == 'conditional_branch':
                fillcolor = "lightyellow"  # 条件分支
            elif node_data['type'] == 'unconditional_branch':
                fillcolor = "lightcoral"  # 无条件分支
            else:
                fillcolor = "lightblue"  # 顺序块

            dot_lines.append(f'  node{node_id} [label="{label}", fillcolor="{fillcolor}"];')

        # 添加边
        for edge in self.edges:
            from_node = edge['from']
            to_node = edge['to']

            if edge['type'] == 'conditional':
                style = "dashed"
                color = "red"
            else:
                style = "solid"
                color = "black"

            dot_lines.append(f'  node{from_node} -> node{to_node} [style={style}, color={color}];')

        dot_lines.append("}")
        return "\n".join(dot_lines)


2、instruction_analyzer.py
[Asm] 纯文本查看 复制代码
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
指令模式分析器
分析VMProtect虚拟化指令的模式和特征
"""

import re
import logging
import time
from typing import Dict, List, Tuple, Optional, Any, Set
from collections import defaultdict, Counter
from dataclasses import dataclass

from ..models.instruction import Instruction, InstructionType, OperandType


@dataclass
class InstructionPattern:
    """指令模式"""
    name: str
    description: str
    patterns: List[str]  # 正则表达式模式列表
    weight: float = 1.0
    category: str = "general"


class InstructionAnalyzer:
    """指令模式分析器"""
    
    def __init__(self, config: Optional[Dict[str, Any]] = None):
        """
        初始化指令分析器
        
        Args:
            config: 配置字典
        """
        self.config = config or {}
        self.logger = logging.getLogger(__name__)
        
        # VMProtect特征模式
        self.vmp_patterns = self._init_vmp_patterns()
        
        # 统计分析结果
        self.analysis_results = {}
    
    def _init_vmp_patterns(self) -> List[InstructionPattern]:
        """初始化VMProtect特征模式"""
        return [
            # VM入口模式
            InstructionPattern(
                name="vm_entry_pushad",
                description="VM入口保护所有寄存器",
                patterns=[r"pushad", r"pusha"],
                weight=2.0,
                category="vm_entry"
            ),
            InstructionPattern(
                name="vm_entry_stack_alloc",
                description="VM入口栈空间分配",
                patterns=[r"sub.*esp.*0x[0-9a-f]+", r"add.*esp.*-0x[0-9a-f]+"],
                weight=1.5,
                category="vm_entry"
            ),
            
            # VM分发器模式
            InstructionPattern(
                name="vm_dispatch_mov_reg_mem",
                description="VM分发器从内存加载handler地址",
                patterns=[r"mov.*eax.*\[ebp\\+0x[0-9a-f]+\]", 
                         r"mov.*ebx.*\[esp\\+0x[0-9a-f]+\]"],
                weight=2.0,
                category="vm_dispatch"
            ),
            InstructionPattern(
                name="vm_dispatch_switch",
                description="VM分发器跳转表",
                patterns=[r"jmp.*\[eax\\*4\\+0x[0-9a-f]+\]",
                         r"jmp.*\[ebx\\*4\\+0x[0-9a-f]+\]"],
                weight=2.5,
                category="vm_dispatch"
            ),
            
            # VM Handler模式
            InstructionPattern(
                name="vm_handler_stack_ops",
                description="VM Handler栈操作",
                patterns=[r"mov.*\[ebp\\+0x[0-9a-f]+\].*eax",
                         r"mov.*eax.*\[ebp\\+0x[0-9a-f]+\]"],
                weight=1.5,
                category="vm_handler"
            ),
            InstructionPattern(
                name="vm_handler_arithmetic",
                description="VM Handler算术运算",
                patterns=[r"add.*eax.*ebx", r"sub.*eax.*ebx", 
                         r"xor.*eax.*ebx", r"and.*eax.*ebx"],
                weight=1.0,
                category="vm_handler"
            ),
            
            # 混淆指令模式
            InstructionPattern(
                name="obfuscated_jumps",
                description="混淆跳转指令",
                patterns=[r"jmp.*eax", r"jmp.*ebx", r"call.*eax", r"call.*ebx"],
                weight=1.8,
                category="obfuscation"
            ),
            InstructionPattern(
                name="obfuscated_push_pop",
                description="混淆push/pop序列",
                patterns=[r"push.*eax.*pop.*ebx", r"push.*ebx.*pop.*eax"],
                weight=1.2,
                category="obfuscation"
            ),
            
            # 反分析模式
            InstructionPattern(
                name="anti_analysis_int3",
                description="反分析断点指令",
                patterns=[r"int3", r"int 3"],
                weight=2.0,
                category="anti_analysis"
            ),
            InstructionPattern(
                name="anti_analysis_ice",
                description="反分析ICE指令",
                patterns=[r"int1", r"int 0x1"],
                weight=2.5,
                category="anti_analysis"
            )
        ]
    
    def analyze_instruction_sequence(self, instructions: List[Instruction]) -> Dict[str, Any]:
        """
        分析指令序列模式
        
        Args:
            instructions: 指令列表
            
        Returns:
            Dict[str, Any]: 分析结果
        """
        if not instructions:
            self.logger.warning("没有指令可供分析")
            return {
                'pattern_matches': {'matches_by_category': {}, 'pattern_counts': {}, 'total_matches': 0},
                'statistics': {'total_instructions': 0, 'instruction_frequency': {}},
                'vm_features': {},
                'complexity_metrics': {},
                'suspicious_sequences': [],
                'execution_time': 0.0
            }
        
        self.logger.info(f"开始分析指令序列,共 {len(instructions)} 条指令")
        
        start_time = time.time()
        
        results = {
            'pattern_matches': {},
            'statistics': {},
            'vm_features': {},
            'complexity_metrics': {},
            'suspicious_sequences': []
        }
        
        # 1. 模式匹配
        pattern_results = self._match_patterns(instructions)
        results['pattern_matches'] = pattern_results
        
        # 2. 统计分析
        stats = self._analyze_statistics(instructions)
        results['statistics'] = stats
        
        # 3. VM特征检测
        vm_features = self._detect_vm_features(instructions, pattern_results)
        results['vm_features'] = vm_features
        
        # 4. 复杂度分析
        complexity = self._analyze_complexity(instructions, pattern_results)
        results['complexity_metrics'] = complexity
        
        # 5. 可疑序列检测
        suspicious = self._find_suspicious_sequences(instructions)
        results['suspicious_sequences'] = suspicious
        
        execution_time = time.time() - start_time
        results['execution_time'] = execution_time
        
        self.analysis_results = results
        return results
    
    def _match_patterns(self, instructions: List[Instruction]) -> Dict[str, Any]:
        """匹配指令模式"""
        matches = defaultdict(list)
        pattern_counts = Counter()
        
        for pattern in self.vmp_patterns:
            for instruction in instructions:
                disasm = instruction.disassembly.lower()
                
                for regex_pattern in pattern.patterns:
                    if re.search(regex_pattern, disasm, re.IGNORECASE):
                        match_info = {
                            'instruction_address': instruction.hex_address,
                            'disassembly': instruction.disassembly,
                            'pattern_name': pattern.name,
                            'weight': pattern.weight
                        }
                        matches[pattern.category].append(match_info)
                        pattern_counts[pattern.name] += 1
                        break
        
        return {
            'matches_by_category': dict(matches),
            'pattern_counts': dict(pattern_counts),
            'total_matches': sum(len(matches[cat]) for cat in matches)
        }
    
    def _analyze_statistics(self, instructions: List[Instruction]) -> Dict[str, Any]:
        """统计分析指令序列"""
        if not instructions:
            return {
                'total_instructions': 0,
                'instruction_frequency': {},
                'instruction_type_distribution': {},
                'instruction_size_stats': {
                    'average': 0,
                    'min': 0,
                    'max': 0,
                    'total_bytes': 0
                },
                'top_mnemonics': {},
                'register_usage': {},
                'address_range': 0,
                'instruction_density': 0
            }
        
        # 指令类型统计
        instruction_types = Counter()
        instruction_sizes = []
        mnemonics = Counter()
        register_usage = Counter()
        
        for instr in instructions:
            # 修复:确保instruction_type是InstructionType枚举
            if hasattr(instr.instruction_type, 'value'):
                instruction_types[instr.instruction_type.value] += 1
            else:
                # 如果不是枚举,使用字符串表示
                instruction_types[str(instr.instruction_type)] += 1
            
            instruction_sizes.append(instr.size)
            mnemonics[instr.mnemonic.lower()] += 1
            
            # 分析寄存器使用
            for operand in instr.operands:
                if operand.type == OperandType.REGISTER:
                    register_usage[operand.value.lower()] += 1
        
        # 计算统计指标
        total_instructions = len(instructions)
        avg_instruction_size = sum(instruction_sizes) / len(instruction_sizes) if instruction_sizes else 0
        
        # 指令密度分析
        addresses = [instr.address for instr in instructions]
        address_range = max(addresses) - min(addresses) if addresses else 0
        instruction_density = total_instructions / address_range if address_range > 0 else 0
        
        return {
            'total_instructions': total_instructions,
            'instruction_frequency': dict(mnemonics),
            'instruction_type_distribution': dict(instruction_types),
            'instruction_size_stats': {
                'average': avg_instruction_size,
                'min': min(instruction_sizes) if instruction_sizes else 0,
                'max': max(instruction_sizes) if instruction_sizes else 0,
                'total_bytes': sum(instruction_sizes)
            },
            'top_mnemonics': dict(mnemonics.most_common(10)),
            'register_usage': dict(register_usage.most_common(15)),
            'address_range': address_range,
            'instruction_density': instruction_density
        }
    
    def _detect_vm_features(self, instructions: List[Instruction], 
                          pattern_results: Dict[str, Any]) -> Dict[str, Any]:
        """检测VMProtect特征"""
        vm_features = {
            'vm_entry_detected': False,
            'vm_dispatch_detected': False,
            'vm_handlers_detected': False,
            'obfuscation_techniques': [],
            'anti_analysis_detected': False,
            'confidence_score': 0.0
        }
        
        matches_by_category = pattern_results.get('matches_by_category', {})
        
        # 检测VM入口
        if 'vm_entry' in matches_by_category:
            vm_features['vm_entry_detected'] = True
            vm_features['confidence_score'] += 2.0
        
        # 检测VM分发器
        if 'vm_dispatch' in matches_by_category:
            vm_features['vm_dispatch_detected'] = True
            vm_features['confidence_score'] += 2.5
        
        # 检测VM Handlers
        if 'vm_handler' in matches_by_category:
            vm_features['vm_handlers_detected'] = True
            vm_features['confidence_score'] += 1.5
        
        # 检测混淆技术
        if 'obfuscation' in matches_by_category:
            obfuscation_matches = matches_by_category['obfuscation']
            vm_features['obfuscation_techniques'] = [
                match['pattern_name'] for match in obfuscation_matches
            ]
            vm_features['confidence_score'] += len(obfuscation_matches) * 0.3
        
        # 检测反分析技术
        if 'anti_analysis' in matches_by_category:
            vm_features['anti_analysis_detected'] = True
            vm_features['confidence_score'] += 2.0
        
        # 归一化置信度分数 (0-10)
        vm_features['confidence_score'] = min(vm_features['confidence_score'], 10.0)
        
        return vm_features
    
    def _analyze_complexity(self, instructions: List[Instruction],
                          pattern_results: Dict[str, Any]) -> Dict[str, Any]:
        """分析指令序列复杂度"""
        if not instructions:
            return {
                'instruction_variety': 0.0,
                'pattern_density': 0.0,
                'control_flow_complexity': 0.0,
                'data_flow_complexity': 0.0,
                'overall_complexity': 0.0
            }
        
        complexity_metrics = {
            'instruction_variety': 0.0,
            'pattern_density': 0.0,
            'control_flow_complexity': 0.0,
            'data_flow_complexity': 0.0,
            'overall_complexity': 0.0
        }
        
        total_instructions = len(instructions)
        
        # 指令多样性
        unique_mnemonics = len(set(instr.mnemonic.lower() for instr in instructions))
        complexity_metrics['instruction_variety'] = unique_mnemonics / total_instructions
        
        # 模式密度
        total_matches = pattern_results.get('total_matches', 0)
        complexity_metrics['pattern_density'] = total_matches / total_instructions
        
        # 控制流复杂度 (分支指令比例)
        branch_instructions = sum(1 for instr in instructions if instr.is_branch())
        complexity_metrics['control_flow_complexity'] = branch_instructions / total_instructions
        
        # 数据流复杂度 (内存访问指令比例)
        memory_instructions = sum(1 for instr in instructions if instr.is_memory_access())
        complexity_metrics['data_flow_complexity'] = memory_instructions / total_instructions
        
        # 总体复杂度 (加权平均)
        weights = [0.2, 0.3, 0.25, 0.25]  # 多样性, 模式密度, 控制流, 数据流
        metrics = [
            complexity_metrics['instruction_variety'],
            complexity_metrics['pattern_density'], 
            complexity_metrics['control_flow_complexity'],
            complexity_metrics['data_flow_complexity']
        ]
        
        complexity_metrics['overall_complexity'] = sum(w * m for w, m in zip(weights, metrics))
        
        return complexity_metrics
    
    def _find_suspicious_sequences(self, instructions: List[Instruction]) -> List[Dict[str, Any]]:
        """查找可疑指令序列"""
        suspicious_sequences = []
        
        # 查找连续的push/pop序列
        push_pop_sequences = self._find_push_pop_sequences(instructions)
        if push_pop_sequences:
            suspicious_sequences.extend(push_pop_sequences)
        
        # 查找间接跳转序列
        indirect_jumps = self._find_indirect_jumps(instructions)
        if indirect_jumps:
            suspicious_sequences.extend(indirect_jumps)
        
        # 查找不寻常的指令组合
        unusual_combinations = self._find_unusual_combinations(instructions)
        if unusual_combinations:
            suspicious_sequences.extend(unusual_combinations)
        
        return suspicious_sequences
    
    def _find_push_pop_sequences(self, instructions: List[Instruction]) -> List[Dict[str, Any]]:
        """查找可疑的push/pop序列"""
        sequences = []
        
        for i in range(len(instructions) - 3):
            window = instructions[i:i+4]
            
            # 检查是否是push reg + 操作 + pop same_reg 模式
            if (window[0].mnemonic.lower() == 'push' and 
                window[3].mnemonic.lower() == 'pop' and
                len(window[0].operands) > 0 and len(window[3].operands) > 0):
                
                push_reg = window[0].operands[0].value
                pop_reg = window[3].operands[0].value
                
                if push_reg == pop_reg:
                    sequence_info = {
                        'type': 'suspicious_push_pop',
                        'description': f'冗余的push/pop {push_reg} 序列',
                        'instructions': [instr.disassembly for instr in window],
                        'addresses': [instr.hex_address for instr in window],
                        'severity': 'medium'
                    }
                    sequences.append(sequence_info)
        
        return sequences
    
    def _find_indirect_jumps(self, instructions: List[Instruction]) -> List[Dict[str, Any]]:
        """查找间接跳转指令"""
        sequences = []
        
        for instr in instructions:
            if instr.is_branch() and instr.operands:
                first_operand = instr.operands[0]
                
                # 检查是否是寄存器间接跳转
                if first_operand.type == OperandType.REGISTER:
                    sequence_info = {
                        'type': 'indirect_jump',
                        'description': f'间接跳转到 {first_operand.value}',
                        'instructions': [instr.disassembly],
                        'addresses': [instr.hex_address],
                        'severity': 'high'
                    }
                    sequences.append(sequence_info)
        
        return sequences
    
    def _find_unusual_combinations(self, instructions: List[Instruction]) -> List[Dict[str, Any]]:
        """查找不寻常的指令组合"""
        sequences = []
        
        for i in range(len(instructions) - 2):
            window = instructions[i:i+3]
            
            # 检查算术运算后立即测试同一寄存器
            if (window[0].mnemonic.lower() in ['add', 'sub', 'xor', 'and', 'or'] and
                window[1].mnemonic.lower() in ['test', 'cmp']):
                
                if (window[0].operands and window[1].operands and
                    window[0].operands[0].value == window[1].operands[0].value):
                    
                    sequence_info = {
                        'type': 'unusual_arithmetic_test',
                        'description': '算术运算后立即测试同一寄存器',
                        'instructions': [instr.disassembly for instr in window],
                        'addresses': [instr.hex_address for instr in window],
                        'severity': 'low'
                    }
                    sequences.append(sequence_info)
        
        return sequences
    
    def _find_common_sequences(self, instructions: List[Instruction]) -> List[Dict[str, Any]]:
        """
        查找常见序列 - 用于测试兼容性
        
        Args:
            instructions: 指令列表
            
        Returns:
            常见序列列表
        """
        sequences = []
        
        # 简单的序列检测
        for i in range(len(instructions) - 2):
            sequence = instructions[i:i+3]
            
            # 检查是否是常见的指令序列
            mnemonics = [instr.mnemonic.lower() for instr in sequence]
            
            # 常见的函数序言
            if mnemonics == ['push', 'mov', 'sub']:
                sequences.append({
                    'type': 'function_prologue',
                    'description': '函数序言序列',
                    'instructions': [instr.disassembly for instr in sequence],
                    'addresses': [instr.hex_address for instr in sequence]
                })
            
            # 常见的栈操作序列
            elif mnemonics == ['push', 'push', 'mov']:
                sequences.append({
                    'type': 'stack_operation',
                    'description': '栈操作序列',
                    'instructions': [instr.disassembly for instr in sequence],
                    'addresses': [instr.hex_address for instr in sequence]
                })
        
        return sequences
    
    def analyze_instruction_patterns(self, instructions: List[Instruction]) -> Dict[str, Any]:
        """分析指令模式 - 用于测试兼容性"""
        return self.analyze_instruction_sequence(instructions)
    
    def detect_vmp_patterns(self, instructions: List[Instruction]) -> Dict[str, Any]:
        """检测VMP模式 - 用于测试兼容性"""
        results = self.analyze_instruction_sequence(instructions)
        return {
            'vmp_indicators': results.get('vm_features', {}).get('obfuscation_techniques', []),
            'obfuscation_techniques': results.get('vm_features', {}).get('obfuscation_techniques', []),
            'confidence_score': results.get('vm_features', {}).get('confidence_score', 0.0)
        }
    
    def identify_instruction_type(self, instruction: Instruction) -> str:
        """识别指令类型 - 用于测试兼容性"""
        # 改进的类型识别逻辑,考虑操作数
        mnemonic = instruction.mnemonic.lower()
        
        if mnemonic == 'push':
            return 'stack_operation'
        elif mnemonic == 'pop':
            return 'stack_operation'
        elif mnemonic == 'mov':
            # 如果有操作数,检查是否是内存存储
            if instruction.operands:
                # 检查最后一个操作数是否是内存引用(包含[])
                for operand in instruction.operands:
                    if operand.type == OperandType.MEMORY:
                        return 'memory_store'
                    elif '[' in operand.value:
                        return 'memory_store'
            # 检查opcode特征 - 某些mov指令是内存存储
            if instruction.opcode in ['48897df8', '488975f0']:
                return 'memory_store'
            return 'register_transfer'
        elif mnemonic in ['add', 'sub', 'mul', 'div']:
            return 'arithmetic'
        elif mnemonic in ['jmp', 'je', 'jne', 'call', 'ret']:
            return 'control_flow'
        else:
            return 'unknown'
    
    def calculate_complexity(self, instructions: List[Instruction]) -> Dict[str, Any]:
        """计算复杂度 - 用于测试兼容性"""
        if not instructions:
            return {
                'instruction_count': 0,
                'unique_opcodes': 0,
                'complexity_score': 0.0
            }
        
        # 计算唯一操作码
        unique_opcodes = len(set(instr.opcode for instr in instructions))
        
        # 计算平均复杂度
        total_complexity = sum(instr.analyze_complexity() for instr in instructions)
        avg_complexity = total_complexity / len(instructions) if instructions else 0.0
        
        return {
            'instruction_count': len(instructions),
            'unique_opcodes': unique_opcodes,
            'complexity_score': avg_complexity
        }
    
    def analyze_control_flow(self, instructions: List[Instruction]) -> Dict[str, Any]:


3、pattern_matcher.py
[Python] 纯文本查看 复制代码
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
指令模式匹配工具
提供通用的模式匹配功能,支持正则表达式和语法树匹配
"""

import re
import logging
from typing import Dict, List, Tuple, Optional, Any, Set, Union
from collections import defaultdict, Counter
from dataclasses import dataclass
from enum import Enum


class MatchType(Enum):
    """匹配类型枚举"""
    REGEX = "regex"
    STRING = "string"
    TOKEN = "token"
    AST = "ast"
    FUZZY = "fuzzy"


class MatchResult:
    """匹配结果"""
    
    def __init__(self, pattern_name: str, match_type: MatchType, 
                 confidence: float, details: Dict[str, Any]):
        self.pattern_name = pattern_name
        self.match_type = match_type
        self.confidence = confidence
        self.details = details
        self.matched_elements = details.get('matched_elements', [])
        self.position = details.get('position', 0)
        self.context = details.get('context', {})
    
    def __str__(self) -> str:
        return f"{self.pattern_name} ({self.match_type.value}): {self.confidence:.2f}"
    
    def to_dict(self) -> Dict[str, Any]:
        """转换为字典"""
        return {
            'pattern_name': self.pattern_name,
            'match_type': self.match_type.value,
            'confidence': self.confidence,
            'details': self.details
        }


@dataclass
class Pattern:
    """模式定义"""
    name: str
    description: str
    patterns: List[str]
    match_type: MatchType = MatchType.REGEX
    weight: float = 1.0
    category: str = "general"
    min_confidence: float = 0.5
    context_requirements: Optional[Dict[str, Any]] = None
    
    def __post_init__(self):
        """初始化后处理"""
        if self.context_requirements is None:
            self.context_requirements = {}


class PatternMatcher:
    """通用模式匹配器"""
    
    def __init__(self, config: Optional[Dict[str, Any]] = None):
        """
        初始化模式匹配器
        
        Args:
            config: 配置字典
        """
        self.config = config or {}
        self.logger = logging.getLogger(__name__)
        
        # 编译的正则表达式缓存
        self._regex_cache: Dict[str, re.Pattern] = {}
        
        # 匹配结果
        self.match_results: List[MatchResult] = []
    
    def add_pattern(self, pattern: Pattern) -> bool:
        """
        添加模式
        
        Args:
            pattern: 模式定义
            
        Returns:
            bool: 是否成功添加
        """
        try:
            # 预编译正则表达式
            if pattern.match_type == MatchType.REGEX:
                for pattern_str in pattern.patterns:
                    if pattern_str not in self._regex_cache:
                        self._regex_cache[pattern_str] = re.compile(pattern_str, re.IGNORECASE)
            return True
        except Exception as e:
            self.logger.error(f"添加模式失败 {pattern.name}: {e}")
            return False
    
    def match_single(self, text: str, pattern: Pattern) -> Optional[MatchResult]:
        """
        单文本匹配
        
        Args:
            text: 要匹配的文本
            pattern: 匹配模式
            
        Returns:
            Optional[MatchResult]: 匹配结果
        """
        if pattern.match_type == MatchType.REGEX:
            return self._match_regex(text, pattern)
        elif pattern.match_type == MatchType.STRING:
            return self._match_string(text, pattern)
        elif pattern.match_type == MatchType.TOKEN:
            return self._match_token(text, pattern)
        elif pattern.match_type == MatchType.FUZZY:
            return self._match_fuzzy(text, pattern)
        else:
            self.logger.warning(f"不支持的匹配类型: {pattern.match_type}")
            return None
    
    def match_multiple(self, texts: List[str], pattern: Pattern) -> List[MatchResult]:
        """
        多文本匹配
        
        Args:
            texts: 文本列表
            pattern: 匹配模式
            
        Returns:
            List[MatchResult]: 匹配结果列表
        """
        results = []
        for i, text in enumerate(texts):
            result = self.match_single(text, pattern)
            if result:
                result.details['position'] = i
                results.append(result)
        return results
    
    def match_sequence(self, sequence: List[str], pattern: Pattern) -> List[MatchResult]:
        """
        序列匹配
        
        Args:
            sequence: 文本序列
            pattern: 匹配模式
            
        Returns:
            List[MatchResult]: 匹配结果列表
        """
        if pattern.match_type != MatchType.REGEX:
            self.logger.warning("序列匹配目前只支持正则表达式")
            return []
        
        results = []
        sequence_text = ' '.join(sequence)
        
        for pattern_str in pattern.patterns:
            if pattern_str not in self._regex_cache:
                self._regex_cache[pattern_str] = re.compile(pattern_str, re.IGNORECASE)
            
            regex = self._regex_cache[pattern_str]
            matches = regex.finditer(sequence_text)
            
            for match in matches:
                confidence = self._calculate_regex_confidence(match, pattern_str)
                if confidence >= pattern.min_confidence:
                    result = MatchResult(
                        pattern_name=pattern.name,
                        match_type=pattern.match_type,
                        confidence=confidence,
                        details={
                            'matched_text': match.group(),
                            'matched_groups': match.groups(),
                            'start_pos': match.start(),
                            'end_pos': match.end(),
                            'pattern_used': pattern_str
                        }
                    )
                    results.append(result)
        
        return results
    
    def _match_regex(self, text: str, pattern: Pattern) -> Optional[MatchResult]:
        """正则表达式匹配"""
        best_match = None
        best_confidence = 0.0
        
        for pattern_str in pattern.patterns:
            if pattern_str not in self._regex_cache:
                self._regex_cache[pattern_str] = re.compile(pattern_str, re.IGNORECASE)
            
            regex = self._regex_cache[pattern_str]
            match = regex.search(text)
            
            if match:
                confidence = self._calculate_regex_confidence(match, pattern_str)
                
                if confidence > best_confidence and confidence >= pattern.min_confidence:
                    best_confidence = confidence
                    best_match = MatchResult(
                        pattern_name=pattern.name,
                        match_type=pattern.match_type,
                        confidence=confidence,
                        details={
                            'matched_text': match.group(),
                            'matched_groups': match.groups(),
                            'start_pos': match.start(),
                            'end_pos': match.end(),
                            'pattern_used': pattern_str
                        }
                    )
        
        return best_match
    
    def _match_string(self, text: str, pattern: Pattern) -> Optional[MatchResult]:
        """字符串匹配"""
        text_lower = text.lower()
        
        for pattern_str in pattern.patterns:
            pattern_lower = pattern_str.lower()
            
            if pattern_lower in text_lower:
                # 计算匹配置信度
                match_length = len(pattern_str)
                text_length = len(text)
                position = text_lower.find(pattern_lower)
                
                confidence = self._calculate_string_confidence(
                    match_length, text_length, position
                )
                
                if confidence >= pattern.min_confidence:
                    return MatchResult(
                        pattern_name=pattern.name,
                        match_type=pattern.match_type,
                        confidence=confidence,
                        details={
                            'matched_text': pattern_str,
                            'position': position,
                            'match_length': match_length
                        }
                    )
        
        return None
    
    def _match_token(self, text: str, pattern: Pattern) -> Optional[MatchResult]:
        """令牌匹配"""
        # 简单的令牌化:按空格分割
        tokens = text.lower().split()
        
        for pattern_str in pattern.patterns:
            pattern_tokens = pattern_str.lower().split()
            
            # 查找令牌序列
            for i in range(len(tokens) - len(pattern_tokens) + 1):
                if tokens[i:i+len(pattern_tokens)] == pattern_tokens:
                    confidence = self._calculate_token_confidence(pattern_tokens, tokens)
                    
                    if confidence >= pattern.min_confidence:
                        return MatchResult(
                            pattern_name=pattern.name,
                            match_type=pattern.match_type,
                            confidence=confidence,
                            details={
                                'matched_tokens': pattern_tokens,
                                'start_token': i,
                                'token_count': len(pattern_tokens)
                            }
                        )
        
        return None
    
    def _match_fuzzy(self, text: str, pattern: Pattern) -> Optional[MatchResult]:
        """模糊匹配"""
        text_lower = text.lower()
        best_match = None
        best_confidence = 0.0
        
        for pattern_str in pattern.patterns:
            pattern_lower = pattern_str.lower()
            
            # 简单的模糊匹配:计算最长公共子序列
            lcs_length = self._longest_common_subsequence(text_lower, pattern_lower)
            
            if lcs_length > 0:
                confidence = (2.0 * lcs_length) / (len(text_lower) + len(pattern_lower))
                
                if confidence > best_confidence and confidence >= pattern.min_confidence:
                    best_confidence = confidence
                    best_match = MatchResult(
                        pattern_name=pattern.name,
                        match_type=pattern.match_type,
                        confidence=confidence,
                        details={
                            'pattern_text': pattern_str,
                            'lcs_length': lcs_length,
                            'similarity_ratio': confidence
                        }
                    )
        
        return best_match
    
    def _calculate_regex_confidence(self, match: re.Match, pattern: str) -> float:
        """计算正则表达式匹配置信度"""
        base_confidence = 0.5
        
        # 匹配长度因素
        match_length = len(match.group())
        if match_length > 10:
            base_confidence += 0.2
        elif match_length > 5:
            base_confidence += 0.1
        
        # 捕获组因素
        if match.groups():
            base_confidence += 0.1
        
        # 模式复杂度因素
        if any(char in pattern for char in ['*', '+', '{', '}', '[', ']']):
            base_confidence += 0.1
        
        return min(base_confidence, 1.0)
    
    def _calculate_string_confidence(self, match_length: int, text_length: int, 
                                   position: int) -> float:
        """计算字符串匹配置信度"""
        # 匹配比例
        ratio = match_length / text_length if text_length > 0 else 0
        
        # 位置因素(开头匹配通常更重要)
        position_factor = 1.0 - (position / max(text_length, 1))
        
        confidence = (ratio * 0.7) + (position_factor * 0.3)
        return min(confidence, 1.0)
    
    def _calculate_token_confidence(self, pattern_tokens: List[str], 
                                  text_tokens: List[str]) -> float:
        """计算令牌匹配置信度"""
        pattern_length = len(pattern_tokens)
        text_length = len(text_tokens)
        
        if pattern_length == 0 or text_length == 0:
            return 0.0
        
        # 令牌长度比例
        length_ratio = pattern_length / text_length
        
        # 令牌重要性(假设较长的令牌更重要)
        token_importance = sum(len(token) for token in pattern_tokens) / pattern_length
        
        confidence = (length_ratio * 0.6) + (token_importance * 0.4)
        return min(confidence, 1.0)
    
    def _longest_common_subsequence(self, text1: str, text2: str) -> int:
        """计算最长公共子序列长度"""
        m, n = len(text1), len(text2)
        dp = [[0] * (n + 1) for _ in range(m + 1)]
        
        for i in range(1, m + 1):
            for j in range(1, n + 1):
                if text1[i - 1] == text2[j - 1]:
                    dp[i][j] = dp[i - 1][j - 1] + 1
                else:
                    dp[i][j] = max(dp[i - 1][j], dp[i][j - 1])
        
        return dp[m][n]
    
    def batch_match(self, texts: List[str], patterns: List[Pattern]) -> Dict[str, List[MatchResult]]:
        """
        批量匹配
        
        Args:
            texts: 文本列表
            patterns: 模式列表
            
        Returns:
            Dict[str, List[MatchResult]]: 按模式分组的匹配结果
        """
        results = defaultdict(list)
        
        for pattern in patterns:
            self.logger.debug(f"应用模式: {pattern.name}")
            
            for text in texts:
                match_result = self.match_single(text, pattern)
                if match_result:
                    results[pattern.name].append(match_result)
        
        return dict(results)
    
    def hierarchical_match(self, texts: List[str], 
                         pattern_hierarchy: Dict[str, List[Pattern]]) -> Dict[str, Any]:
        """
        分层匹配
        
        Args:
            texts: 文本列表
            pattern_hierarchy: 分层模式字典
            
        Returns:
            Dict[str, Any]: 分层匹配结果
        """
        hierarchical_results = {}
        
        for category, patterns in pattern_hierarchy.items():
            self.logger.info(f"处理类别: {category}")
            
            category_results = []
            for pattern in patterns:
                matches = self.match_multiple(texts, pattern)
                if matches:
                    category_results.extend(matches)
            
            if category_results:
                hierarchical_results[category] = {
                    'match_count': len(category_results),
                    'matches': [match.to_dict() for match in category_results],
                    'average_confidence': sum(match.confidence for match in category_results) / len(category_results)
                }
        
        return hierarchical_results
    
    def get_statistics(self) -> Dict[str, Any]:
        """获取匹配统计信息"""
        if not self.match_results:
            return {}
        
        total_matches = len(self.match_results)
        confidence_scores = [match.confidence for match in self.match_results]
        
        # 按模式分组
        pattern_groups = defaultdict(list)
        for match in self.match_results:
            pattern_groups[match.pattern_name].append(match)
        
        return {
            'total_matches': total_matches,
            'average_confidence': sum(confidence_scores) / total_matches,
            'max_confidence': max(confidence_scores),
            'min_confidence': min(confidence_scores),
            'patterns_found': len(pattern_groups),
            'matches_by_pattern': {
                pattern: len(matches) 
                for pattern, matches in pattern_groups.items()
            },
            'confidence_distribution': self._get_confidence_distribution(confidence_scores)
        }
    
    def _get_confidence_distribution(self, confidence_scores: List[float]) -> Dict[str, int]:
        """获取置信度分布"""
        distribution = {
            'high': 0,    # 0.8-1.0
            'medium': 0,  # 0.5-0.8
            'low': 0      # 0.0-0.5
        }
        
        for score in confidence_scores:
            if score >= 0.8:
                distribution['high'] += 1
            elif score >= 0.5:
                distribution['medium'] += 1
            else:
                distribution['low'] += 1
        
        return distribution
    
    def clear_results(self):
        """清空匹配结果"""
        self.match_results.clear()
    
    def export_results(self, format: str = 'json') -> Any:
        """
        导出匹配结果
        
        Args:
            format: 导出格式 ('json', 'dict', 'list')
            
        Returns:
            Any: 导出结果
        """
        if format == 'json':
            import json
            return json.dumps([match.to_dict() for match in self.match_results], indent=2)
        elif format == 'dict':
            return [match.to_dict() for match in self.match_results]
        elif format == 'list':
            return self.match_results
        else:
            self.logger.warning(f"不支持的导出格式: {format}")
            return None


class VMPatternMatcher(PatternMatcher):
    """VMProtect专用模式匹配器"""
    
    def __init__(self, config: Optional[Dict[str, Any]] = None):
        super().__init__(config)
        self._init_vm_patterns()
    
    def _init_vm_patterns(self):
        """初始化VMProtect专用模式"""
        vm_patterns = [
            Pattern(
                name="vm_entry_pushad",
                description="VM入口保护所有寄存器",
                patterns=[r"pushad", r"pusha"],
                match_type=MatchType.REGEX,
                weight=2.0,
                category="vm_entry"
            ),
            Pattern(
                name="vm_entry_stack_alloc",
                description="VM入口栈空间分配",
                patterns=[r"sub.*esp.*0x[0-9a-f]+", r"add.*esp.*-0x[0-9a-f]+"],
                match_type=MatchType.REGEX,
                weight=1.5,
                category="vm_entry"
            ),
            Pattern(
                name="vm_dispatch_mov_reg_mem",
                description="VM分发器从内存加载handler地址",
                patterns=[r"mov.*eax.*\\[ebp\\+0x[0-9a-f]+\\]", 
                         r"mov.*ebx.*\\[esp\\+0x[0-9a-f]+\\]"],
                match_type=MatchType.REGEX,
                weight=2.0,
                category="vm_dispatch"
            ),
            Pattern(
                name="vm_dispatch_switch",
                description="VM分发器跳转表",
                patterns=[r"jmp.*\\[eax\\*4\\+0x[0-9a-f]+\\]",
                         r"jmp.*\\[ebx\\*4\\+0x[0-9a-f]+\\]"],
                match_type=MatchType.REGEX,
                weight=2.5,
                category="vm_dispatch"
            ),
            Pattern(
                name="vm_handler_stack_ops",
                description="VM Handler栈操作",
                patterns=[r"mov.*\\[ebp\\+0x[0-9a-f]+\\].*eax",
                         r"mov.*eax.*\\[ebp\\+0x[0-9a-f]+\\]"],
                match_type=MatchType.REGEX,
                weight=1.5,
                category="vm_handler"
            ),
            Pattern(
                name="obfuscated_jumps",
                description="混淆跳转指令",
                patterns=["jmp eax", "jmp ebx", "call eax", "call ebx"],
                match_type=MatchType.STRING,
                weight=1.8,
                category="obfuscation"
            )
        ]
        
        for pattern in vm_patterns:
            self.add_pattern(pattern)
    
    def analyze_vm_instructions(self, instructions: List[str]) -> Dict[str, Any]:
        """
        分析VM指令
        
        Args:
            instructions: 指令列表
            
        Returns:
            Dict[str, Any]: 分析结果
        """
        self.clear_results()
        
        # 执行批量匹配
        all_matches = self.batch_match(instructions, [
            pattern for pattern in self._regex_cache.values()
        ])
        
        # 收集所有匹配结果
        for matches in all_matches.values():
            self.match_results.extend(matches)
        
        # 生成分析报告
        report = {
            'statistics': self.get_statistics(),
            'matches_by_category': self._group_matches_by_category(),
            'vm_confidence_score': self._calculate_vm_confidence(),
            'recommendations': self._generate_vm_recommendations()
        }
        
        return report
    
    def _group_matches_by_category(self) -> Dict[str, Any]:
        """按类别分组匹配结果"""
        category_groups = defaultdict(list)
        
        for match in self.match_results:
            # 从模式名称推断类别
            if 'entry' in match.pattern_name:
                category = 'vm_entry'
            elif 'dispatch' in match.pattern_name:
                category = 'vm_dispatch'
            elif 'handler' in match.pattern_name:
                category = 'vm_handler'
            elif 'obfus' in match.pattern_name:
                category = 'obfuscation'
            else:
                category = 'other'
            
            category_groups[category].append(match.to_dict())
        
        return {
            category: {
                'count': len(matches),
                'average_confidence': sum(m['confidence'] for m in matches) / len(matches),
                'matches': matches
            }
            for category, matches in category_groups.items()
        }
    
    def _calculate_vm_confidence(self) -> float:
        """计算VM检测置信度"""
        if not self.match_results:
            return 0.0
        
        # 基于匹配数量和置信度计算总分
        total_score = 0.0
        
        for match in self.match_results:
            # 不同类别的权重
            if 'entry' in match.pattern_name:
                weight = 3.0
            elif 'dispatch' in match.pattern_name:
                weight = 4.0
            elif 'handler' in match.pattern_name:
                weight = 2.0
            elif 'obfus' in match.pattern_name:
                weight = 1.5
            else:
                weight = 1.0
            
            total_score += match.confidence * weight
        
        # 归一化到0-10分
        normalized_score = min(total_score / 2.0, 10.0)
        return normalized_score
    
    def _generate_vm_recommendations(self) -> List[str]:
        """生成VM分析建议"""
        recommendations = []
        
        vm_confidence = self._calculate_vm_confidence()
        category_groups = self._group_matches_by_category()
        
        if vm_confidence >= 8.0:
            recommendations.append("高置信度检测到VMProtect虚拟化")
            recommendations.append("建议使用符号执行进行深入分析")
        elif vm_confidence >= 5.0:
            recommendations.append("中等置信度检测到虚拟化保护")
            recommendations.append("建议分析VM入口和分发器")
        
        if 'vm_entry' in category_groups:
            recommendations.append(f"发现 {category_groups['vm_entry']['count']} 个VM入口点")
        
        if 'vm_dispatch' in category_groups:
            recommendations.append(f"发现 {category_groups['vm_dispatch']['count']} 个VM分发器")
        
        if 'obfuscation' in category_groups:
            recommendations.append("检测到混淆技术,建议使用自动化去混淆")
        
        if not recommendations:
            recommendations.append("未发现明显的VMProtect特征")
        
        return recommendations
        """分析控制流 - 用于测试兼容性"""
        if not instructions:
            return {
                'basic_blocks': 0,
                'jump_targets': [],
                'call_instructions': []
            }
            
        branch_instructions = [instr for instr in instructions if instr.is_branch()]
        
        return {
            'basic_blocks': len(branch_instructions) + 1,  # 简化计算
            'jump_targets': [instr.hex_address for instr in branch_instructions],
            'call_instructions': [instr for instr in branch_instructions if instr.mnemonic.lower() == 'call']
        }


4、robust_trace_parser.py
[Python] 纯文本查看 复制代码
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
健壮的跟踪文件解析器
处理各种格式问题
"""

import os
import re
import logging
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple


class RobustTraceParser:
    """健壮的跟踪文件解析器"""

    def __init__(self, config: Dict[str, Any] = None):
        self.config = config or {}
        self.logger = logging.getLogger(__name__)

    def parse_trace_file(self, trace_file: str) -> Dict[str, Any]:
        """解析跟踪文件 - 健壮版本"""
        results = {
            'success': False,
            'instructions': [],
            'registers': [],
            'memory_accesses': [],
            'errors': [],
            'warnings': [],
            'file_info': {}
        }

        try:
            if not os.path.exists(trace_file):
                results['errors'].append(f"文件不存在: {trace_file}")
                return results

            file_size = os.path.getsize(trace_file)
            results['file_info']['size'] = file_size
            results['file_info']['path'] = trace_file

            with open(trace_file, 'r', encoding='utf-8') as f:
                lines = f.readlines()
                results['file_info']['total_lines'] = len(lines)

                for line_num, line in enumerate(lines, 1):
                    line = line.strip()

                    # 跳过空行和注释
                    if not line:
                        continue
                    if line.startswith('#'):
                        continue

                    # 解析行
                    line_result = self._parse_line(line, line_num)

                    if line_result['type'] == 'instruction':
                        results['instructions'].append(line_result)
                    elif line_result['type'] == 'register':
                        results['registers'].append(line_result)
                    elif line_result['type'] == 'memory':
                        results['memory_accesses'].append(line_result)

                    if line_result.get('error'):
                        results['errors'].append(f"第{line_num}行: {line_result['error']}")
                    if line_result.get('warning'):
                        results['warnings'].append(f"第{line_num}行: {line_result['warning']}")

            results['success'] = len(results['errors']) == 0
            results['file_info']['parsed_instructions'] = len(results['instructions'])
            results['file_info']['parsed_registers'] = len(results['registers'])
            results['file_info']['parsed_memory'] = len(results['memory_accesses'])

            self.logger.info(f"解析完成: {len(results['instructions'])} 指令, {len(results['errors'])} 错误")

        except Exception as e:
            results['errors'].append(f"解析过程失败: {e}")
            self.logger.error(f"解析跟踪文件失败: {e}")

        return results

    def _parse_line(self, line: str, line_num: int) -> Dict[str, Any]:
        """解析单行"""
        result = {
            'original_line': line,
            'line_number': line_num,
            'type': 'unknown',
            'error': None,
            'warning': None
        }

        try:
            # 使用更灵活的分割方法
            parts = [part.strip() for part in line.split(':')]

            if len(parts) < 2:
                result['error'] = "字段不足"
                return result

            record_type = parts[0].lower()

            if record_type == 'i':  # 指令
                return self._parse_instruction(parts, result)
            elif record_type == 'r':  # 寄存器
                return self._parse_register(parts, result)
            elif record_type == 'mr':  # 内存读取
                return self._parse_memory(parts, result)
            else:
                result['error'] = f"未知记录类型: {record_type}"
                return result

        except Exception as e:
            result['error'] = f"解析异常: {e}"
            return result

    def _parse_instruction(self, parts: List[str], result: Dict[str, Any]) -> Dict[str, Any]:
        """解析指令行"""
        if len(parts) < 4:
            result['error'] = "指令行字段不足"
            return result

        result['type'] = 'instruction'

        try:
            # 地址
            address_str = self._clean_hex(parts[1])
            if not self._is_valid_hex(address_str):
                result['error'] = f"无效的地址: {parts[1]}"
                return result
            result['address'] = int(address_str, 16)

            # 大小
            try:
                result['size'] = int(parts[2])
            except ValueError:
                result['error'] = f"无效的大小: {parts[2]}"
                return result

            # 数据
            data_str = self._clean_hex(parts[3])
            if not self._is_valid_hex(data_str):
                result['error'] = f"无效的指令数据: {parts[3]}"
                return result

            # 检查数据长度与大小是否匹配
            expected_bytes = result['size']
            actual_bytes = len(data_str) // 2  # 每个字节2个十六进制字符

            if actual_bytes != expected_bytes:
                result['warning'] = f"数据长度不匹配: 预期{expected_bytes}字节, 实际{actual_bytes}字节"

            result['data_hex'] = data_str
            result['data_bytes'] = bytes.fromhex(data_str)

            # 简单的指令分析
            result['instruction_info'] = self._analyze_instruction(data_str)

        except Exception as e:
            result['error'] = f"指令解析失败: {e}"

        return result

    def _parse_register(self, parts: List[str], result: Dict[str, Any]) -> Dict[str, Any]:
        """解析寄存器行"""
        if len(parts) < 14:
            result['warning'] = "寄存器行字段可能不足"

        result['type'] = 'register'
        result['register_values'] = []

        try:
            # 第一个字段是执行ID
            exec_id_str = self._clean_hex(parts[1])
            if self._is_valid_hex(exec_id_str):
                result['execution_id'] = int(exec_id_str, 16)
            else:
                result['execution_id'] = 1  # 默认值

            # 寄存器值 (从第2个字段开始)
            register_names = ['rip', 'rax', 'rbx', 'rcx', 'rdx', 'rsp', 'rbp', 'rsi', 'rdi', 'r8', 'r9', 'r10', 'r11']

            for i, reg_name in enumerate(register_names):
                if i + 2 < len(parts):
                    value_str = self._clean_hex(parts[i + 2])
                    if self._is_valid_hex(value_str):
                        result['register_values'].append({
                            'register': reg_name,
                            'value_hex': value_str,
                            'value_int': int(value_str, 16)
                        })
                    else:
                        result['register_values'].append({
                            'register': reg_name,
                            'value_hex': value_str,
                            'value_int': 0,  # 默认值
                            'warning': '无效的十六进制值'
                        })
                else:
                    # 字段不足,使用默认值
                    result['register_values'].append({
                        'register': reg_name,
                        'value_hex': '0',
                        'value_int': 0
                    })

        except Exception as e:
            result['error'] = f"寄存器解析失败: {e}"

        return result

    def _parse_memory(self, parts: List[str], result: Dict[str, Any]) -> Dict[str, Any]:
        """解析内存访问行"""
        if len(parts) < 4:
            result['error'] = "内存行字段不足"
            return result

        result['type'] = 'memory'

        try:
            # 地址
            address_str = self._clean_hex(parts[1])
            if not self._is_valid_hex(address_str):
                result['error'] = f"无效的内存地址: {parts[1]}"
                return result
            result['address'] = int(address_str, 16)

            # 大小
            try:
                result['size'] = int(parts[2])
            except ValueError:
                result['error'] = f"无效的大小: {parts[2]}"
                return result

            # 数据
            data_str = self._clean_hex(parts[3])
            if not self._is_valid_hex(data_str):
                result['error'] = f"无效的内存数据: {parts[3]}"
                return result

            result['data_hex'] = data_str
            result['data_int'] = int(data_str, 16)

        except Exception as e:
            result['error'] = f"内存解析失败: {e}"

        return result

    def _clean_hex(self, value: str) -> str:
        """清理十六进制值"""
        if not value:
            return "0"

        # 转换为小写,移除0x前缀和空格
        clean = value.strip().lower()
        if clean.startswith('0x'):
            clean = clean[2:]

        # 只保留十六进制字符
        clean = ''.join(c for c in clean if c in '0123456789abcdef')

        return clean if clean else "0"

    def _is_valid_hex(self, value: str) -> bool:
        """检查是否为有效的十六进制字符串"""
        if not value:
            return False
        return all(c in '0123456789abcdef' for c in value)

    def _analyze_instruction(self, data_hex: str) -> Dict[str, Any]:
        """简单指令分析"""
        if not data_hex:
            return {'type': 'unknown'}

        # 常见的x86_64指令前缀
        prefixes = {
            '31': 'xor',  # xor reg, reg
            '89': 'mov',  # mov reg, reg
            '8b': 'mov',  # mov reg, [mem]
            '01': 'add',  # add [mem], reg
            '03': 'add',  # add reg, [mem]
            '29': 'sub',  # sub [mem], reg
            '2b': 'sub',  # sub reg, [mem]
            'ff': 'call/jmp',  # call/jmp [mem]
            'e8': 'call',  # call rel32
            'e9': 'jmp',  # jmp rel32
            '90': 'nop',  # nop
            'c3': 'ret',  # ret
            'f3': 'prefix',  # rep prefix
            'f2': 'prefix',  # repne prefix
        }

        # 检查前缀
        for prefix, inst_type in prefixes.items():
            if data_hex.startswith(prefix):
                return {'type': inst_type, 'prefix': prefix}

        return {'type': 'unknown'}


def test_parser():
    """测试解析器"""
    parser = RobustTraceParser()

    # 测试文件
    test_files = [
        "vmp_traces/minimal_test.trace",
        "vmp_traces/clean_test.trace",
        "vmp_traces/nop_test.trace"
    ]

    for test_file in test_files:
        if os.path.exists(test_file):
            print(f"\n分析文件: {test_file}")
            results = parser.parse_trace_file(test_file)

            print(f"  成功: {results['success']}")
            print(f"  指令: {len(results['instructions'])}")
            print(f"  寄存器同步: {len(results['registers'])}")
            print(f"  内存访问: {len(results['memory_accesses'])}")
            print(f"  错误: {len(results['errors'])}")
            print(f"  警告: {len(results['warnings'])}")

            if results['errors']:
                print("  错误详情:")
                for error in results['errors'][:3]:  # 只显示前3个错误
                    print(f"    - {error}")


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    test_parser()


5、trace_parser.py
[Python] 纯文本查看 复制代码
# modules/core/trace_parser.py 修复版本

import logging
import struct
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
import capstone as cs
from capstone import CS_ARCH_X86, CS_MODE_32, CS_MODE_64


class TraceParser:
    """跟踪文件解析器 - 修复VMProtect跟踪格式解析"""

    def __init__(self, config: Dict[str, Any] = None):
        self.config = config or {}
        self.logger = logging.getLogger(__name__)

        # 初始化Capstone反汇编器
        arch = self.config.get('architecture', 'x86')
        mode = self.config.get('mode', '64')

        if arch.lower() == 'x86':
            if mode == '32':
                self.md = cs.Cs(CS_ARCH_X86, CS_MODE_32)
            else:
                self.md = cs.Cs(CS_ARCH_X86, CS_MODE_64)
        else:
            # 默认使用x86-64
            self.md = cs.Cs(CS_ARCH_X86, CS_MODE_64)

        # 设置反汇编选项
        self.md.detail = True
        self.md.skipdata = True

        self.logger.info("Capstone反汇编器初始化成功")

    def parse_trace(self, trace_file: str) -> List[Dict[str, Any]]:
        """解析跟踪文件 - 修复版本"""
        try:
            self.logger.info(f"开始解析跟踪文件: {trace_file}")

            # 检查文件是否存在
            trace_path = Path(trace_file)
            if not trace_path.exists():
                self.logger.error(f"跟踪文件不存在: {trace_file}")
                return []

            # 读取文件内容
            with open(trace_path, 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read().strip()

            if not content:
                self.logger.warning("跟踪文件为空")
                return []

            # 解析跟踪文件
            instructions = self._parse_trace_content_vmp_format(content)
            self.logger.info(f"成功解析 {len(instructions)} 条指令")

            return instructions

        except Exception as e:
            self.logger.error(f"解析跟踪文件失败: {e}")
            return []

    def _parse_trace_content_vmp_format(self, content: str) -> List[Dict[str, Any]]:
        """解析VMProtect格式的跟踪文件内容"""
        instructions = []
        lines = content.split('\n')
        current_registers = {}

        for line_num, line in enumerate(lines, 1):
            line = line.strip()
            if not line:
                continue

            try:
                # 解析寄存器状态行
                if line.startswith('r:'):
                    current_registers = self._parse_register_line(line)
                    continue

                # 解析指令行
                if line.startswith('i:'):
                    instruction = self._parse_vmp_instruction_line(line, line_num, current_registers.copy())
                    if instruction:
                        instructions.append(instruction)

            except Exception as e:
                self.logger.warning(f"解析第 {line_num} 行失败: {e}")
                continue

        return instructions

    def _parse_register_line(self, line: str) -> Dict[str, int]:
        """解析寄存器状态行"""
        registers = {}
        try:
            # 格式: r:thread_id:rax:rbx:rcx:rdx:rsi:rdi:rsp:rbp:r8:r9:r10:r11
            parts = line[2:].split(':')
            if len(parts) >= 13:
                register_names = ['thread_id', 'rax', 'rbx', 'rcx', 'rdx', 'rsi', 'rdi',
                                  'rsp', 'rbp', 'r8', 'r9', 'r10', 'r11']

                for i, name in enumerate(register_names):
                    if i < len(parts):
                        try:
                            # 解析十六进制值
                            value_str = parts[i].strip()
                            if value_str.startswith('0x'):
                                registers[name] = int(value_str, 16)
                            else:
                                registers[name] = int(value_str)
                        except ValueError:
                            registers[name] = 0
        except Exception as e:
            self.logger.debug(f"解析寄存器行失败: {e}")

        return registers

    def _parse_vmp_instruction_line(self, line: str, line_num: int, registers: Dict[str, int]) -> Optional[
        Dict[str, Any]]:
        """解析VMProtect指令行"""
        try:
            # 格式: i:address:length:machine_code [# comment]
            parts = line[2:].split(':')
            if len(parts) < 3:
                return None

            # 解析地址
            address_str = parts[0].strip()
            try:
                if address_str.startswith('0x'):
                    address = int(address_str, 16)
                else:
                    address = int(address_str)
            except ValueError:
                address = line_num

            # 解析长度
            length_str = parts[1].strip()
            length = int(length_str) if length_str.isdigit() else 0

            # 解析机器码
            machine_code_part = parts[2].strip()
            # 移除注释
            if '#' in machine_code_part:
                machine_code = machine_code_part.split('#')[0].strip()
            else:
                machine_code = machine_code_part

            # 创建指令字典
            instruction = {
                'address': address,
                'length': length,
                'machine_code': machine_code,
                'register_state': registers,
                'line_number': line_num,
                'raw_line': line
            }

            # 添加反汇编和分析信息
            instruction.update(self._analyze_instruction(machine_code, address))

            return instruction

        except Exception as e:
            self.logger.debug(f"解析指令行失败 (行 {line_num}): {e}")
            return None

    def _analyze_instruction(self, machine_code: str, address: int) -> Dict[str, Any]:
        """分析指令"""
        analysis = {
            'opcode': 'unknown',
            'operands': '',
            'mnemonic': 'unknown',
            'type': 'other',
            'category': 'general',
            'is_control_flow': False,
            'is_arithmetic': False,
            'is_logical': False,
            'is_memory_access': False
        }

        try:
            # 尝试使用Capstone反汇编
            if machine_code and len(machine_code) >= 2:
                try:
                    # 将十六进制字符串转换为字节
                    code_bytes = bytes.fromhex(machine_code.replace(' ', ''))

                    # 反汇编
                    for instr in self.md.disasm(code_bytes, address):
                        analysis.update({
                            'opcode': instr.mnemonic,
                            'operands': instr.op_str,
                            'mnemonic': instr.mnemonic,
                            'type': self._classify_instruction_type(instr.mnemonic),
                            'category': self._categorize_instruction(instr.mnemonic),
                            'is_control_flow': self._is_control_flow(instr),
                            'is_arithmetic': self._is_arithmetic(instr.mnemonic),
                            'is_logical': self._is_logical(instr.mnemonic),
                            'is_memory_access': any(op.type == cs.x86.X86_OP_MEM for op in instr.operands)
                        })
                        break
                except Exception as e:
                    self.logger.debug(f"反汇编失败: {e}")
                    # 使用基础分析作为备用
                    analysis.update(self._basic_instruction_analysis(machine_code))

        except Exception as e:
            self.logger.debug(f"指令分析失败: {e}")

        return analysis

    def _basic_instruction_analysis(self, machine_code: str) -> Dict[str, Any]:
        """基础指令分析"""
        # 简化的指令类型检测
        machine_code_lower = machine_code.lower()

        analysis = {
            'opcode': machine_code_lower[:6],  # 取前6个字符作为操作码
            'operands': '',
            'mnemonic': 'unknown'
        }

        # 基于机器码模式的基础分类
        if machine_code_lower.startswith(('e8', 'e9', 'eb')):  # call, jmp, jmp short
            analysis.update({
                'type': 'control_flow',
                'category': 'control_flow',
                'is_control_flow': True
            })
        elif machine_code_lower.startswith(('74', '75', '0f84', '0f85')):  # je, jne, jz, jnz
            analysis.update({
                'type': 'control_flow',
                'category': 'control_flow',
                'is_control_flow': True
            })
        elif machine_code_lower.startswith(('c3', 'c2')):  # ret, retn
            analysis.update({
                'type': 'control_flow',
                'category': 'control_flow',
                'is_control_flow': True
            })
        elif machine_code_lower.startswith(('50', '51', '52', '53', '54', '55', '56', '57')):  # push
            analysis.update({
                'type': 'stack',
                'category': 'data_transfer'
            })
        elif machine_code_lower.startswith(('58', '59', '5a', '5b', '5c', '5d', '5e', '5f')):  # pop
            analysis.update({
                'type': 'stack',
                'category': 'data_transfer'
            })
        elif machine_code_lower.startswith(('31', '33')):  # xor, xor
            analysis.update({
                'type': 'logical',
                'category': 'logical',
                'is_logical': True
            })
        else:
            analysis.update({
                'type': 'other',
                'category': 'general'
            })

        return analysis

    def _classify_instruction_type(self, mnemonic: str) -> str:
        """分类指令类型"""
        mnemonic_lower = mnemonic.lower()

        control_flow = ['jmp', 'call', 'ret', 'je', 'jne', 'jg', 'jl', 'ja', 'jb']
        arithmetic = ['add', 'sub', 'mul', 'div', 'inc', 'dec']
        logical = ['and', 'or', 'xor', 'not', 'shl', 'shr']
        data_transfer = ['mov', 'push', 'pop', 'lea']
        stack = ['push', 'pop', 'enter', 'leave']

        if mnemonic_lower in control_flow:
            return 'control_flow'
        elif mnemonic_lower in arithmetic:
            return 'arithmetic'
        elif mnemonic_lower in logical:
            return 'logical'
        elif mnemonic_lower in data_transfer:
            return 'data_transfer'
        elif mnemonic_lower in stack:
            return 'stack'
        else:
            return 'other'

    def _categorize_instruction(self, mnemonic: str) -> str:
        """分类指令到更广泛的类别"""
        mnemonic_lower = mnemonic.lower()

        if mnemonic_lower in ['jmp', 'call', 'ret']:
            return 'control_flow'
        elif mnemonic_lower in ['mov', 'push', 'pop']:
            return 'data_transfer'
        elif mnemonic_lower in ['add', 'sub', 'mul', 'div']:
            return 'arithmetic'
        elif mnemonic_lower in ['and', 'or', 'xor']:
            return 'logical'
        else:
            return 'general'

    def _is_control_flow(self, instr) -> bool:
        """检查是否为控制流指令"""
        return (instr.group(cs.CS_GRP_JUMP) or
                instr.group(cs.CS_GRP_CALL) or
                instr.group(cs.CS_GRP_RET) or
                instr.group(cs.CS_GRP_BRANCH_RELATIVE))

    def _is_arithmetic(self, mnemonic: str) -> bool:
        """检查是否为算术指令"""
        return mnemonic.lower() in ['add', 'sub', 'mul', 'div', 'inc', 'dec']

    def _is_logical(self, mnemonic: str) -> bool:
        """检查是否为逻辑指令"""
        return mnemonic.lower() in ['and', 'or', 'xor', 'not', 'shl', 'shr']

    # 保留原有的元数据和验证方法
    def parse_metadata(self, trace_file: str) -> Dict[str, Any]:
        """解析跟踪文件的元数据"""
        try:
            trace_path = Path(trace_file)
            if not trace_path.exists():
                return {}

            stats = trace_path.stat()
            metadata = {
                'file_size': stats.st_size,
                'modified_time': stats.st_mtime,
                'file_path': str(trace_path.absolute()),
                'file_name': trace_path.name
            }

            # 尝试读取第一行获取更多信息
            with open(trace_path, 'r', encoding='utf-8', errors='ignore') as f:
                first_line = f.readline().strip()
                metadata['first_line'] = first_line[:100]

            return metadata

        except Exception as e:
            self.logger.error(f"解析元数据失败: {e}")
            return {}

    def validate_trace_format(self, trace_file: str) -> Tuple[bool, str]:
        """验证跟踪文件格式"""
        try:
            trace_path = Path(trace_file)
            if not trace_path.exists():
                return False, "文件不存在"

            with open(trace_path, 'r', encoding='utf-8', errors='ignore') as f:
                first_few_lines = [f.readline().strip() for _ in range(5)]

            # 验证VMProtect格式
            valid_lines = 0
            for line in first_few_lines:
                if line and (line.startswith('i:') or line.startswith('r:')):
                    valid_lines += 1

            if valid_lines >= 2:
                return True, "VMProtect跟踪格式有效"
            else:
                return False, "文件格式不符合VMProtect跟踪格式"

        except Exception as e:
            return False, f"验证失败: {e}"


6、vm_detector.py
[Python] 纯文本查看 复制代码
import logging
import re
import math
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
from collections import defaultdict, Counter


class VMDetector:
    """VMProtect特征检测器 - 修复版本"""

    def __init__(self, config: Dict[str, Any] = None):
        self.config = config or {}
        self.logger = logging.getLogger(__name__)

        # VMProtect特征签名
        self.vmp_signatures = self._init_vmp_signatures()
        self.detection_results = {}

    def _init_vmp_signatures(self) -> Dict[str, Any]:
        """初始化VMProtect特征签名"""
        return {
            'vm_entry_patterns': [
                r'pushad',
                r'pushfd',
                r'sub.*esp.*0x[0-9a-f]+',
                r'mov.*ebp.*esp',
                r'enter.*0x[0-9a-f]+'
            ],
            'vm_dispatch_patterns': [
                r'mov.*eax.*\[ebp\+',
                r'mov.*ebx.*\[esp\+',
                r'jmp.*\[eax\*4\+',
                r'jmp.*\[ebx\*4\+',
                r'ff.*\[ebp\+'
            ],
            'vm_handler_patterns': [
                r'mov.*\[ebp\+.*eax',
                r'mov.*eax.*\[ebp\+',
                r'add.*eax.*ebx',
                r'xor.*eax.*ebx'
            ],
            'obfuscation_patterns': [
                r'jmp.*eax',
                r'call.*ebx',
                r'push.*eax.*pop.*ebx',
                r'xchg.*eax.*ebx'
            ],
            'anti_analysis_patterns': [
                r'int3',
                r'int 0x2d',
                r'icebp',
                r'rdtsc',
                r'cpuid'
            ]
        }

    def detect_vm_features(self, instructions: List[Dict[str, Any]]) -> Dict[str, Any]:
        """检测VMProtect特征 - 主入口方法"""
        try:
            self.logger.info(f"开始VM特征检测,共 {len(instructions)} 条指令")

            if not instructions:
                return self._get_empty_result()

            results = {
                'has_virtualization': False,
                'confidence_score': 0.0,
                'vm_entries': [],
                'vm_dispatchers': [],
                'vm_handlers': [],
                'vm_exits': [],
                'has_anti_debug': False,
                'has_obfuscated_stack': False,
                'has_virtual_registers': False,
                'detected_patterns': [],
                'protection_level': 'unknown',
                'analysis_recommendations': []
            }

            # 检测VM入口
            vm_entries = self._detect_vm_entries(instructions)
            results['vm_entries'] = vm_entries

            # 检测VM分发器
            vm_dispatchers = self._detect_vm_dispatchers(instructions)
            results['vm_dispatchers'] = vm_dispatchers

            # 检测VM处理器
            vm_handlers = self._detect_vm_handlers(instructions)
            results['vm_handlers'] = vm_handlers

            # 检测VM出口
            vm_exits = self._detect_vm_exits(instructions)
            results['vm_exits'] = vm_exits

            # 检测反调试技术
            results['has_anti_debug'] = self._detect_anti_debug(instructions)

            # 检测堆栈混淆
            results['has_obfuscated_stack'] = self._detect_obfuscated_stack(instructions)

            # 检测虚拟寄存器
            results['has_virtual_registers'] = self._detect_virtual_registers(instructions)

            # 计算置信度分数
            confidence_score = self._calculate_confidence_score(results)
            results['confidence_score'] = confidence_score

            # 确定是否有虚拟化
            results['has_virtualization'] = confidence_score > 0.3

            # 估算保护级别
            results['protection_level'] = self._estimate_protection_level(confidence_score)

            # 收集检测到的模式
            results['detected_patterns'] = self._collect_detected_patterns(results)

            # 生成建议
            results['analysis_recommendations'] = self._generate_recommendations(results)

            self.detection_results = results
            self.logger.info(f"VM特征检测完成,置信度: {confidence_score:.2f}")

            return results

        except Exception as e:
            self.logger.error(f"VM特征检测失败: {e}")
            return self._get_empty_result()

    def _get_empty_result(self) -> Dict[str, Any]:
        """获取空的检测结果"""
        return {
            'has_virtualization': False,
            'confidence_score': 0.0,
            'vm_entries': [],
            'vm_dispatchers': [],
            'vm_handlers': [],
            'vm_exits': [],
            'has_anti_debug': False,
            'has_obfuscated_stack': False,
            'has_virtual_registers': False,
            'detected_patterns': [],
            'protection_level': 'unknown',
            'analysis_recommendations': []
        }

    def _detect_vm_entries(self, instructions: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """检测VM入口点"""
        entries = []

        for i, instr in enumerate(instructions):
            opcode = instr.get('opcode', '').upper()
            operands = instr.get('operands', '').upper()
            full_instruction = f"{opcode} {operands}"

            # 检查VM入口模式
            for pattern in self.vmp_signatures['vm_entry_patterns']:
                if re.search(pattern, full_instruction, re.IGNORECASE):
                    entry_info = {
                        'address': instr.get('address', 0),
                        'instruction': full_instruction,
                        'pattern': pattern,
                        'confidence': 'high',
                        'context': self._get_instruction_context(instructions, i)
                    }
                    entries.append(entry_info)
                    break

            # 特定的VM入口指令
            if opcode in ['PUSHAD', 'PUSHFD']:
                # 检查后续指令是否为栈操作
                if i < len(instructions) - 2:
                    next_instr = instructions[i + 1]
                    next_opcode = next_instr.get('opcode', '').upper()
                    if next_opcode in ['SUB', 'MOV'] and 'ESP' in next_instr.get('operands', ''):
                        entry_info = {
                            'address': instr.get('address', 0),
                            'instruction': full_instruction,
                            'pattern': '寄存器保存+栈设置',
                            'confidence': 'high',
                            'context': self._get_instruction_context(instructions, i)
                        }
                        entries.append(entry_info)

        return entries

    def _detect_vm_dispatchers(self, instructions: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """检测VM分发器"""
        dispatchers = []

        for i, instr in enumerate(instructions):
            opcode = instr.get('opcode', '').upper()
            operands = instr.get('operands', '').upper()
            full_instruction = f"{opcode} {operands}"

            # 检查VM分发器模式
            for pattern in self.vmp_signatures['vm_dispatch_patterns']:
                if re.search(pattern, full_instruction, re.IGNORECASE):
                    dispatcher_info = {
                        'address': instr.get('address', 0),
                        'instruction': full_instruction,
                        'pattern': pattern,
                        'confidence': 'medium',
                        'context': self._get_instruction_context(instructions, i)
                    }
                    dispatchers.append(dispatcher_info)
                    break

            # 检查间接跳转
            if opcode == 'JMP' and '[' in operands:
                dispatcher_info = {
                    'address': instr.get('address', 0),
                    'instruction': full_instruction,
                    'pattern': '间接跳转',
                    'confidence': 'medium',
                    'context': self._get_instruction_context(instructions, i)
                }
                dispatchers.append(dispatcher_info)

        return dispatchers

    def _detect_vm_handlers(self, instructions: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """检测VM处理器"""
        handlers = []

        for i, instr in enumerate(instructions):
            opcode = instr.get('opcode', '').upper()
            operands = instr.get('operands', '').upper()
            full_instruction = f"{opcode} {operands}"

            # 检查VM处理器模式
            for pattern in self.vmp_signatures['vm_handler_patterns']:
                if re.search(pattern, full_instruction, re.IGNORECASE):
                    handler_info = {
                        'address': instr.get('address', 0),
                        'instruction': full_instruction,
                        'pattern': pattern,
                        'confidence': 'medium',
                        'context': self._get_instruction_context(instructions, i)
                    }
                    handlers.append(handler_info)
                    break

            # 检查栈操作模式
            if opcode in ['PUSH', 'POP', 'MOV'] and any(reg in operands for reg in ['[EBP', '[ESP']):
                handler_info = {
                    'address': instr.get('address', 0),
                    'instruction': full_instruction,
                    'pattern': '栈操作',
                    'confidence': 'low',
                    'context': self._get_instruction_context(instructions, i)
                }
                handlers.append(handler_info)

        return handlers

    def _detect_vm_exits(self, instructions: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """检测VM出口点"""
        exits = []

        for i, instr in enumerate(instructions):
            opcode = instr.get('opcode', '').upper()
            operands = instr.get('operands', '').upper()
            full_instruction = f"{opcode} {operands}"

            # 检查VM出口指令
            if opcode in ['POPAD', 'POPFD', 'LEAVE', 'RET']:
                # 检查前面的指令是否为栈恢复
                exit_info = {
                    'address': instr.get('address', 0),
                    'instruction': full_instruction,
                    'pattern': 'VM出口',
                    'confidence': 'high',
                    'context': self._get_instruction_context(instructions, i)
                }
                exits.append(exit_info)

        return exits

    def _detect_anti_debug(self, instructions: List[Dict[str, Any]]) -> bool:
        """检测反调试技术"""
        for instr in instructions:
            opcode = instr.get('opcode', '').upper()
            operands = instr.get('operands', '').upper()
            full_instruction = f"{opcode} {operands}"

            # 检查反调试模式
            for pattern in self.vmp_signatures['anti_analysis_patterns']:
                if re.search(pattern, full_instruction, re.IGNORECASE):
                    return True

            # 特定的反调试指令
            if opcode in ['INT3', 'RDTSC', 'CPUID']:
                return True

        return False

    def _detect_obfuscated_stack(self, instructions: List[Dict[str, Any]]) -> bool:
        """检测堆栈混淆"""
        stack_operations = 0
        complex_stack_ops = 0

        for instr in instructions:
            opcode = instr.get('opcode', '').upper()
            operands = instr.get('operands', '').upper()

            # 统计栈操作
            if opcode in ['PUSH', 'POP']:
                stack_operations += 1

            # 检查复杂的栈操作
            if opcode == 'MOV' and ('[EBP' in operands or '[ESP' in operands):
                complex_stack_ops += 1

        # 如果栈操作频繁且复杂,认为有堆栈混淆
        total_instructions = len(instructions)
        if total_instructions > 0:
            stack_density = stack_operations / total_instructions
            complex_ratio = complex_stack_ops / max(stack_operations, 1)

            if stack_density > 0.1 and complex_ratio > 0.3:
                return True

        return False

    def _detect_virtual_registers(self, instructions: List[Dict[str, Any]]) -> bool:
        """检测虚拟寄存器使用"""
        register_usage = defaultdict(int)

        for instr in instructions:
            opcode = instr.get('opcode', '').upper()
            operands = instr.get('operands', '').upper()

            # 统计寄存器使用频率
            for reg in ['EAX', 'EBX', 'ECX', 'EDX', 'ESI', 'EDI', 'EBP', 'ESP']:
                if reg in operands:
                    register_usage[reg] += 1

        # 如果某些寄存器使用异常频繁,可能是虚拟寄存器
        if register_usage:
            total_usage = sum(register_usage.values())
            max_usage = max(register_usage.values())

            if max_usage / total_usage > 0.4:  # 某个寄存器使用超过40%
                return True

        return False

    def _get_instruction_context(self, instructions: List[Dict[str, Any]], index: int, context_size: int = 3) -> List[
        str]:
        """获取指令上下文"""
        start = max(0, index - context_size)
        end = min(len(instructions), index + context_size + 1)

        context = []
        for i in range(start, end):
            instr = instructions[i]
            context.append(f"{instr.get('opcode', '')} {instr.get('operands', '')}")

        return context

    def _calculate_confidence_score(self, results: Dict[str, Any]) -> float:
        """计算置信度分数"""
        score = 0.0

        # VM入口贡献
        vm_entries = results.get('vm_entries', [])
        for entry in vm_entries:
            if entry.get('confidence') == 'high':
                score += 0.3
            elif entry.get('confidence') == 'medium':
                score += 0.2
            else:
                score += 0.1

        # VM分发器贡献
        vm_dispatchers = results.get('vm_dispatchers', [])
        for dispatcher in vm_dispatchers:
            if dispatcher.get('confidence') == 'high':
                score += 0.2
            elif dispatcher.get('confidence') == 'medium':
                score += 0.15
            else:
                score += 0.05

        # VM处理器贡献
        vm_handlers = results.get('vm_handlers', [])
        score += len(vm_handlers) * 0.05

        # VM出口贡献
        vm_exits = results.get('vm_exits', [])
        score += len(vm_exits) * 0.1

        # 其他特征贡献
        if results.get('has_anti_debug'):
            score += 0.1

        if results.get('has_obfuscated_stack'):
            score += 0.15

        if results.get('has_virtual_registers'):
            score += 0.1

        return min(score, 1.0)

    def _estimate_protection_level(self, confidence_score: float) -> str:
        """估算保护级别"""
        if confidence_score > 0.7:
            return "high"
        elif confidence_score > 0.4:
            return "medium"
        elif confidence_score > 0.1:
            return "low"
        else:
            return "none"

    def _collect_detected_patterns(self, results: Dict[str, Any]) -> List[str]:
        """收集检测到的模式"""
        patterns = []

        # 从各个检测结果中提取模式
        for entry in results.get('vm_entries', []):
            pattern = entry.get('pattern', '')
            if pattern and pattern not in patterns:
                patterns.append(pattern)

        for dispatcher in results.get('vm_dispatchers', []):
            pattern = dispatcher.get('pattern', '')
            if pattern and pattern not in patterns:
                patterns.append(pattern)

        for handler in results.get('vm_handlers', []):
            pattern = handler.get('pattern', '')
            if pattern and pattern not in patterns:
                patterns.append(pattern)

        # 添加特殊模式
        if results.get('has_anti_debug'):
            patterns.append('anti_debug')

        if results.get('has_obfuscated_stack'):
            patterns.append('obfuscated_stack')

        if results.get('has_virtual_registers'):
            patterns.append('virtual_registers')

        return patterns

    def _generate_recommendations(self, results: Dict[str, Any]) -> List[str]:
        """生成分析建议"""
        recommendations = []

        confidence_score = results.get('confidence_score', 0)
        protection_level = results.get('protection_level', 'none')

        if protection_level == 'high':
            recommendations.append("检测到高强度VM保护,建议使用深度符号执行")
            recommendations.append("需要分析VM分发器和处理器逻辑")
        elif protection_level == 'medium':
            recommendations.append("中等强度VM保护,标准分析方法可能有效")
            recommendations.append("建议结合动态分析验证结果")
        elif protection_level == 'low':
            recommendations.append("低强度VM保护,可以尝试快速分析方法")
        else:
            recommendations.append("未检测到明显的VM保护特征")

        # 基于具体特征的推荐
        if results.get('has_anti_debug'):
            recommendations.append("检测到反调试技术,建议在隔离环境中分析")

        if results.get('has_obfuscated_stack'):
            recommendations.append("检测到堆栈混淆,需要详细的堆栈跟踪分析")

        if results.get('has_virtual_registers'):
            recommendations.append("检测到虚拟寄存器,需要寄存器映射分析")

        return recommendations

    def get_detailed_report(self) -> Dict[str, Any]:
        """获取详细检测报告"""
        if not self.detection_results:
            return {'error': 'No detection results available'}

        return {
            'summary': {
                'virtualization_detected': self.detection_results.get('has_virtualization', False),
                'confidence_score': self.detection_results.get('confidence_score', 0),
                'protection_level': self.detection_results.get('protection_level', 'unknown')
            },
            'details': {
                'vm_entries_count': len(self.detection_results.get('vm_entries', [])),
                'vm_dispatchers_count': len(self.detection_results.get('vm_dispatchers', [])),
                'vm_handlers_count': len(self.detection_results.get('vm_handlers', [])),
                'vm_exits_count': len(self.detection_results.get('vm_exits', [])),
                'anti_debug_detected': self.detection_results.get('has_anti_debug', False),
                'obfuscated_stack_detected': self.detection_results.get('has_obfuscated_stack', False),
                'virtual_registers_detected': self.detection_results.get('has_virtual_registers', False)
            },
            'patterns': self.detection_results.get('detected_patterns', []),
            'recommendations': self.detection_results.get('analysis_recommendations', [])
        }

免费评分

参与人数 1热心值 +1 收起 理由
Jokerboxs + 1 谢谢@Thanks!

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

rwxqaz098 发表于 2025-12-2 11:19
代码看不明白啊,老太太的裹脚布又臭又长。
Henglie 发表于 2025-12-2 18:26
真是新手吗?我感觉玩壳的都不新手。下次其实可以把成品.py文件扔附件里
paituo 发表于 2025-12-3 14:00
看日志中这是啥?

开始分析跟踪文件: vmp_traces\sample1.vmp.trace (模式: deep)

wanliu 发表于 2025-12-5 17:43
你要是新手我应该还没开智
 楼主| Jackdeng 发表于 2025-12-5 17:58
看日志中这是啥?

开始分析跟踪文件: vmp_traces\sample1.vmp.trace (模式: deep)==》可以接大模型进行分析!
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - 52pojie.cn ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2026-5-9 16:25

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表