吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 270|回复: 1
收起左侧

[Python 原创] 基于PCR532 IC卡一键定点写入器快速版本!快速

[复制链接]
330 发表于 2026-4-29 18:05
本帖最后由 苏紫方璇 于 2026-5-5 14:53 编辑

python开发代码如下

[Python] 纯文本查看 复制代码
import json
import os
import time
import ctypes
import threading
import subprocess
import tempfile
import urllib.request
import tkinter as tk
from tkinter import filedialog, messagebox, ttk

import serial
import serial.tools.list_ports

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
KEY_LIBRARY_FILE = os.path.join(BASE_DIR, "key_library.json")
DEBUG_LOG_FILE = os.path.join(BASE_DIR, "write_debug.log")

DRIVER_EXE = "CH341SER.EXE"
DRIVER_URL = "https://www.wch.cn/downloads/file/65.html?time=2023-03-16%2022:57:59"
BAUD_RATE = 115200

PN532_PREAMBLE = bytes.fromhex("00 00 FF")
PN532_POSTAMBLE = b"\x00"
PN532_ACK = bytes.fromhex("00 00 FF 00 FF 00")
PN532_HOST_TFI = 0xD4
PN532_PN532_TFI = 0xD5

MIFARE_AUTH_A = 0x60
MIFARE_AUTH_B = 0x61
MIFARE_WRITE_16 = 0xA0


def is_admin():
    try:
        return ctypes.windll.shell32.IsUserAnAdmin()
    except Exception:
        return False


def list_serial_ports():
    return list(serial.tools.list_ports.comports())


def has_ch34x_device():
    try:
        result = subprocess.run(
            [
                "powershell",
                "-NoProfile",
                "-Command",
                (
                    "Get-PnpDevice -PresentOnly | "
                    "Where-Object { $_.FriendlyName -match 'CH340|CH341|USB-SERIAL CH340|USB Serial' } | "
                    "Select-Object -First 1 -ExpandProperty FriendlyName"
                ),
            ],
            capture_output=True,
            text=True,
            timeout=8,
            check=False,
        )
        return bool(result.stdout.strip())
    except Exception:
        return False


def download_driver_package(target_path):
    request = urllib.request.Request(DRIVER_URL, headers={"User-Agent": "Mozilla/5.0"})
    with urllib.request.urlopen(request, timeout=30) as response:
        data = response.read()
    with open(target_path, "wb") as f:
        f.write(data)


def normalize_key_hex(text):
    cleaned = "".join(ch for ch in text if ch in "0123456789abcdefABCDEF").upper()
    if len(cleaned) != 12:
        raise ValueError("密钥必须是 12 位十六进制,例如 FFFFFFFFFFFF")
    return cleaned


def format_hex_preview(data, bytes_per_line=16):
    lines = []
    for offset in range(0, len(data), bytes_per_line):
        chunk = data[offset : offset + bytes_per_line]
        hex_part = " ".join(f"{byte:02X}" for byte in chunk)
        lines.append(f"{offset:04X}  {hex_part}")
    return "\n".join(lines) if lines else "无数据"


def is_sector_trailer(block_index):
    return (block_index + 1) % 4 == 0


def extract_keys_from_dump(data, source_name):
    entries = []
    total_blocks = len(data) // 16
    for block_index in range(total_blocks):
        if not is_sector_trailer(block_index):
            continue
        start = block_index * 16
        trailer = data[start : start + 16]
        if len(trailer) < 16:
            continue
        sector_index = block_index // 4
        key_a = trailer[:6].hex().upper()
        key_b = trailer[10:16].hex().upper()
        entries.append(
            {
                "key": key_a,
                "type": "A",
                "source": f"{source_name} sector {sector_index} trailer",
            }
        )
        entries.append(
            {
                "key": key_b,
                "type": "B",
                "source": f"{source_name} sector {sector_index} trailer",
            }
        )
    return entries


def extract_sector_keys(data):
    sector_keys = {}
    total_blocks = len(data) // 16
    for block_index in range(total_blocks):
        if not is_sector_trailer(block_index):
            continue
        start = block_index * 16
        trailer = data[start : start + 16]
        if len(trailer) < 16:
            continue
        sector_keys[block_index // 4] = {
            "A": trailer[:6].hex().upper(),
            "B": trailer[10:16].hex().upper(),
        }
    return sector_keys


def unique_key_entries(entries):
    unique = []
    seen = set()
    for entry in entries:
        try:
            key_hex = normalize_key_hex(entry["key"])
        except Exception:
            continue
        key_type = entry.get("type", "A").upper()
        if key_type not in {"A", "B"}:
            continue
        signature = (key_hex, key_type)
        if signature in seen:
            continue
        seen.add(signature)
        unique.append(
            {
                "key": key_hex,
                "type": key_type,
                "source": str(entry.get("source", "手动添加")),
            }
        )
    return unique


def load_key_library(path):
    default_entries = [{"key": "FFFFFFFFFFFF", "type": "A", "source": "默认密钥"}]
    if not os.path.exists(path):
        entries = unique_key_entries(default_entries)
        save_key_library(path, entries)
        return entries

    try:
        with open(path, "r", encoding="utf-8") as f:
            payload = json.load(f)
    except Exception:
        entries = unique_key_entries(default_entries)
        save_key_library(path, entries)
        return entries

    if isinstance(payload, dict):
        raw_entries = payload.get("keys", [])
    elif isinstance(payload, list):
        raw_entries = payload
    else:
        raw_entries = []

    entries = unique_key_entries(default_entries + raw_entries)
    save_key_library(path, entries)
    return entries


def save_key_library(path, entries):
    with open(path, "w", encoding="utf-8") as f:
        json.dump({"keys": unique_key_entries(entries)}, f, ensure_ascii=False, indent=2)


def build_frame(data):
    length = len(data) + 1
    lcs = (-length) & 0xFF
    body = bytes([PN532_HOST_TFI]) + bytes(data)
    dcs = (-sum(body)) & 0xFF
    return PN532_PREAMBLE + bytes([length, lcs]) + body + bytes([dcs]) + PN532_POSTAMBLE


def read_exact(ser, size, timeout=2.0):
    deadline = time.time() + timeout
    buf = bytearray()
    while len(buf) < size and time.time() < deadline:
        chunk = ser.read(size - len(buf))
        if chunk:
            buf.extend(chunk)
        else:
            time.sleep(0.01)
    if len(buf) != size:
        raise TimeoutError(f"串口读取超时,期望 {size} 字节,实际 {len(buf)} 字节")
    return bytes(buf)


def read_response_frame(ser):
    ack = read_exact(ser, len(PN532_ACK))
    if ack != PN532_ACK:
        raise RuntimeError(f"PN532 ACK 无效: {ack.hex(' ').upper()}")

    header = read_exact(ser, 5)
    if header[:3] != PN532_PREAMBLE:
        raise RuntimeError(f"PN532 帧头无效: {header.hex(' ').upper()}")

    length = header[3]
    lcs = header[4]
    if ((length + lcs) & 0xFF) != 0:
        raise RuntimeError("PN532 长度校验失败")

    body = read_exact(ser, length + 2)
    payload = body[:length]
    dcs = body[length]
    postamble = body[length + 1]
    if ((sum(payload) + dcs) & 0xFF) != 0:
        raise RuntimeError("PN532 数据校验失败")
    if postamble != 0x00:
        raise RuntimeError("PN532 帧尾无效")
    if not payload or payload[0] != PN532_PN532_TFI:
        raise RuntimeError("PN532 响应 TFI 无效")
    return payload[1:]


def pn532_command(ser, data, expected_code=None):
    ser.reset_input_buffer()
    ser.write(build_frame(data))
    response = read_response_frame(ser)
    if expected_code is not None and (not response or response[0] != expected_code):
        actual = response[0] if response else None
        raise RuntimeError(f"PN532 响应命令不匹配,期望 {expected_code:02X},实际 {actual}")
    return response


def pn532_wakeup(ser):
    ser.write(bytes.fromhex("55 55 00 00 00 00 00 00 00 00 00 00 00 00 00 00 FF 03 FD D4 14 01 17 00"))
    time.sleep(0.2)
    try:
        ser.reset_input_buffer()
    except Exception:
        pass


def pn532_sam_configuration(ser):
    pn532_command(ser, [0x14, 0x01, 0x14, 0x01], expected_code=0x15)


def pn532_list_passive_target(ser):
    response = pn532_command(ser, [0x4A, 0x01, 0x00], expected_code=0x4B)
    if len(response) < 7 or response[1] < 1:
        raise RuntimeError("未检测到 Mifare 卡")
    uid_length_index = 6
    uid_length = response[uid_length_index]
    uid = bytes(response[uid_length_index + 1 : uid_length_index + 1 + uid_length])
    if len(uid) < 4:
        raise RuntimeError("卡 UID 长度异常")
    return response[2], uid


def build_uid_auth_candidates(uid):
    if len(uid) <= 4:
        return [uid[:4]]
    candidates = []
    lower4 = uid[-4:]
    upper4 = uid[:4]

    if upper4 != lower4:
        candidates.append(upper4)
    return candidates


def mifare_authenticate_once(ser, target_number, block_index, uid, key_hex, key_type):
    auth_cmd = MIFARE_AUTH_A if key_type == "A" else MIFARE_AUTH_B
    for auth_uid in build_uid_auth_candidates(uid):
        data = [0x40, target_number, auth_cmd, block_index]
        data.extend(bytes.fromhex(key_hex))
        data.extend(auth_uid)
        response = pn532_command(ser, data, expected_code=0x41)
        if len(response) >= 2 and response[1] == 0x00:
            return True
    return False


def mifare_write_block(ser, target_number, block_index, block_data):
    if len(block_data) != 16:
        raise ValueError("Mifare 单块必须是 16 字节")
    data = [0x40, target_number, MIFARE_WRITE_16, block_index]
    data.extend(block_data)
    response = pn532_command(ser, data, expected_code=0x41)
    if len(response) < 2 or response[1] != 0x00:
        raise RuntimeError(f"写块失败,块 {block_index}")


def append_debug_log(line):
    timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
    with open(DEBUG_LOG_FILE, "a", encoding="utf-8") as f:
        f.write(f"[{timestamp}] {line}\n")


class PCR532WriterApp:
    def __init__(self, root):
        self.root = root
        self.root.title("基于PCR532 IC卡一键定点写入器 冯学长制作")
        self.root.geometry("860x820")
        self.root.minsize(780, 720)

        self.file_data = None
        self.file_path = ""
        self.file_sector_keys = {}
        self.selected_port = tk.StringVar()
        self.status_var = tk.StringVar(value="准备就绪")
        self.progress_var = tk.DoubleVar(value=0)
        self.driver_check_running = False
        self.key_library_window = None
        self.key_tree = None
        self.key_type_var = tk.StringVar(value="A")
        self.key_count_var = tk.StringVar()

        self.key_library = load_key_library(KEY_LIBRARY_FILE)

        self.setup_ui()
        self.refresh_key_count()
        self.auto_check_driver()

    def setup_ui(self):
        header = tk.Frame(self.root, bg="#2c3e50", height=60)
        header.pack(fill="x")
        header.pack_propagate(False)
        tk.Label(
            header,
            text="IC卡一键写入工具",
            fg="white",
            bg="#2c3e50",
            font=("Microsoft YaHei UI", 16),
        ).pack(pady=15)

        dev_frame = tk.LabelFrame(self.root, text="1. 设备与驱动检查", padx=10, pady=10)
        dev_frame.pack(pady=10, padx=20, fill="x")

        self.port_combo = ttk.Combobox(dev_frame, textvariable=self.selected_port, state="readonly")
        self.port_combo.pack(side="left", expand=True, fill="x", padx=5)

        tk.Button(dev_frame, text="刷新/安装驱动", command=self.auto_check_driver).pack(side="right")

        file_frame = tk.LabelFrame(self.root, text="2. 选择写入文件 (.bin / .dump)", padx=10, pady=10)
        file_frame.pack(pady=10, padx=20, fill="x")

        self.file_label = tk.Label(file_frame, text="未选择文件", fg="gray", wraplength=540, justify="left")
        self.file_label.pack(side="left", expand=True, anchor="w")

        tk.Button(file_frame, text="浏览文件", command=self.load_file).pack(side="right")

        key_frame = tk.LabelFrame(self.root, text="3. 密钥库与认证", padx=10, pady=10)
        key_frame.pack(pady=10, padx=20, fill="x")

        self.key_count_label = tk.Label(key_frame, textvariable=self.key_count_var, fg="#2c3e50")
        self.key_count_label.pack(anchor="w", pady=(0, 8))

        key_button_row = tk.Frame(key_frame)
        key_button_row.pack(fill="x")

        tk.Button(key_button_row, text="管理密钥库", command=self.open_key_library_window).pack(side="left", padx=(0, 8))
        tk.Button(key_button_row, text="提取当前文件密钥到库", command=self.extract_keys_from_current_file).pack(side="left")

        exec_frame = tk.LabelFrame(self.root, text="4. 写卡操作", padx=10, pady=10)
        exec_frame.pack(pady=10, padx=20, fill="x")

        self.btn_write = tk.Button(
            exec_frame,
            text="开始写卡",
            bg="#27ae60",
            fg="white",
            font=("Microsoft YaHei UI", 12, "bold"),
            height=2,
            command=self.start_write_thread,
        )
        self.btn_write.pack(fill="x")

        self.progress_bar = ttk.Progressbar(self.root, variable=self.progress_var, maximum=100)
        self.progress_bar.pack(fill="x", padx=20, pady=5)

        self.lbl_status = tk.Label(self.root, textvariable=self.status_var, fg="#2980b9")
        self.lbl_status.pack(pady=5)

        info_frame = tk.LabelFrame(self.root, text="5. 文件数据预览(HEX)", padx=10, pady=10)
        info_frame.pack(pady=10, padx=20, fill="both", expand=True)

        self.file_meta_label = tk.Label(
            info_frame,
            text="长度: -\n路径: -",
            anchor="w",
            justify="left",
            fg="#2c3e50",
        )
        self.file_meta_label.pack(fill="x", pady=(0, 8))

        text_holder = tk.Frame(info_frame)
        text_holder.pack(fill="both", expand=True)

        y_scroll = ttk.Scrollbar(text_holder, orient="vertical")
        y_scroll.pack(side="right", fill="y")
        x_scroll = ttk.Scrollbar(info_frame, orient="horizontal")
        x_scroll.pack(fill="x")

        self.preview_text = tk.Text(
            text_holder,
            height=14,
            wrap="none",
            font=("Consolas", 10),
            state="disabled",
            bg="#f7f9fb",
            yscrollcommand=y_scroll.set,
            xscrollcommand=x_scroll.set,
        )
        self.preview_text.pack(side="left", fill="both", expand=True)
        y_scroll.config(command=self.preview_text.yview)
        x_scroll.config(command=self.preview_text.xview)

        tk.Label(
            self.root,
            text="说明:写入时会跳过 block 0 和每个扇区 trailer 块;认证时会自动遍历密钥库。",
            fg="#7f8c8d",
        ).pack(pady=(0, 8))

    def refresh_key_count(self):
        self.key_count_var.set(
            f"当前密钥数:{len(self.key_library)}  个,库文件:{os.path.basename(KEY_LIBRARY_FILE)}"
        )
        if self.key_tree is not None:
            self.refresh_key_tree()

    def save_key_library(self):
        save_key_library(KEY_LIBRARY_FILE, self.key_library)
        self.key_library = load_key_library(KEY_LIBRARY_FILE)
        self.refresh_key_count()

    def add_key_entries(self, entries):
        before = len(self.key_library)
        self.key_library = unique_key_entries(self.key_library + entries)
        self.save_key_library()
        return len(self.key_library) - before

    def open_key_library_window(self):
        if self.key_library_window is not None and self.key_library_window.winfo_exists():
            self.key_library_window.focus()
            return

        window = tk.Toplevel(self.root)
        window.title("密钥库")
        window.geometry("760x460")
        window.transient(self.root)
        self.key_library_window = window

        tree = ttk.Treeview(window, columns=("type", "key", "source"), show="headings", height=12)
        tree.heading("type", text="类型")
        tree.heading("key", text="密钥")
        tree.heading("source", text="来源")
        tree.column("type", width=80, anchor="center")
        tree.column("key", width=160, anchor="center")
        tree.column("source", width=460, anchor="w")
        tree.pack(fill="both", expand=True, padx=12, pady=(12, 8))
        self.key_tree = tree
        self.refresh_key_tree()

        form = tk.LabelFrame(window, text="手动添加密钥", padx=10, pady=10)
        form.pack(fill="x", padx=12, pady=(0, 8))

        tk.Label(form, text="密钥").pack(side="left")
        self.key_entry_widget = tk.Entry(form, width=22)
        self.key_entry_widget.pack(side="left", padx=(8, 8))

        tk.Label(form, text="类型").pack(side="left")
        key_type_combo = ttk.Combobox(form, width=8, state="readonly", textvariable=self.key_type_var)
        key_type_combo["values"] = ("A", "B", "Both")
        key_type_combo.pack(side="left", padx=(8, 8))
        key_type_combo.current(0)

        tk.Button(form, text="添加到库", command=self.add_manual_key).pack(side="left")

        action_row = tk.Frame(window)
        action_row.pack(fill="x", padx=12, pady=(0, 12))

        tk.Button(action_row, text="删除选中密钥", command=self.delete_selected_keys).pack(side="left")
        tk.Button(action_row, text="关闭", command=window.destroy).pack(side="right")

    def refresh_key_tree(self):
        if self.key_tree is None or not self.key_tree.winfo_exists():
            return
        self.key_tree.delete(*self.key_tree.get_children())
        for entry in self.key_library:
            iid = f"{entry['type']}:{entry['key']}"
            self.key_tree.insert("", "end", iid=iid, values=(entry["type"], entry["key"], entry["source"]))

    def add_manual_key(self):
        raw_key = self.key_entry_widget.get().strip()
        try:
            key_hex = normalize_key_hex(raw_key)
        except ValueError as e:
            messagebox.showwarning("格式错误", str(e))
            return

        selected_type = self.key_type_var.get().strip() or "A"
        entries = []
        if selected_type == "Both":
            entries.append({"key": key_hex, "type": "A", "source": "手动添加"})
            entries.append({"key": key_hex, "type": "B", "source": "手动添加"})
        else:
            entries.append({"key": key_hex, "type": selected_type, "source": "手动添加"})

        added = self.add_key_entries(entries)
        self.key_entry_widget.delete(0, tk.END)
        if added:
            messagebox.showinfo("密钥库", f"已新增 {added} 个密钥")
        else:
            messagebox.showinfo("密钥库", "密钥已存在,无新增内容")

    def delete_selected_keys(self):
        if self.key_tree is None:
            return
        selected = self.key_tree.selection()
        if not selected:
            messagebox.showwarning("提示", "请先选择要删除的密钥")
            return

        selected_pairs = set()
        for item_id in selected:
            values = self.key_tree.item(item_id, "values")
            if len(values) >= 2:
                selected_pairs.add((values[1], values[0]))

        self.key_library = [
            entry
            for entry in self.key_library
            if (entry["key"], entry["type"]) not in selected_pairs
        ]
        self.save_key_library()

    def extract_keys_from_current_file(self):
        if not self.file_data:
            messagebox.showwarning("提示", "请先加载一个 .bin 或 .dump 文件")
            return

        source_name = os.path.basename(self.file_path) or "当前文件"
        extracted = extract_keys_from_dump(self.file_data, source_name)
        added = self.add_key_entries(extracted)
        messagebox.showinfo(
            "提取完成",
            f"已从 {source_name} 的扇区 trailer 中提取 {len(extracted)} 个候选密钥。\n"
            f"新增入库 {added} 个唯一密钥。",
        )

    def set_ports(self, port_list):
        self.port_combo["values"] = port_list
        if port_list:
            self.port_combo.current(0)

    def set_preview_text(self, content):
        self.preview_text.config(state="normal")
        self.preview_text.delete("1.0", tk.END)
        self.preview_text.insert("1.0", content)
        self.preview_text.config(state="disabled")

    def update_file_preview(self):
        if self.file_data is None:
            self.file_meta_label.config(text="长度: -\n路径: -")
            self.set_preview_text("无数据")
            return
        self.file_meta_label.config(text=f"长度: {len(self.file_data)} 字节\n路径: {self.file_path}")
        self.set_preview_text(format_hex_preview(self.file_data))

    def auto_check_driver(self):
        if self.driver_check_running:
            return
        self.driver_check_running = True
        self.status_var.set("正在检查驱动和串口...")
        threading.Thread(target=self._auto_check_driver_worker, daemon=True).start()

    def _auto_check_driver_worker(self):
        try:
            ports = list_serial_ports()
            if ports:
                port_list = [p.device for p in ports]
                self.root.after(0, lambda: self.set_ports(port_list))
                self.root.after(0, lambda: self.status_var.set(f"已发现 {len(port_list)} 个可用串口"))
                return

            if has_ch34x_device():
                self.root.after(0, lambda: self.status_var.set("已检测到 CH340/CH341 设备,请重新插拔后点击刷新"))
                return

            self.root.after(0, lambda: self.status_var.set("未检测到驱动,正在联网下载安装..."))
            self.install_driver_online()

            refreshed_ports = list_serial_ports()
            refreshed_list = [p.device for p in refreshed_ports]
            self.root.after(0, lambda: self.set_ports(refreshed_list))
            if refreshed_list:
                self.root.after(0, lambda: self.status_var.set("驱动安装完成,串口已刷新"))
            else:
                self.root.after(0, lambda: self.status_var.set("驱动安装完成,请重新插拔设备后点击刷新"))
        except PermissionError as e:
            self.root.after(0, lambda: self.status_var.set(str(e)))
            self.root.after(0, lambda: messagebox.showwarning("权限不足", str(e)))
        except Exception as e:
            self.root.after(0, lambda: self.status_var.set(f"驱动安装失败: {e}"))
            self.root.after(0, lambda: messagebox.showerror("驱动安装失败", f"联网安装驱动失败:{e}"))
        finally:
            self.driver_check_running = False

    def install_driver_online(self):
        if not is_admin():
            raise PermissionError("请以管理员身份运行程序,才能自动安装驱动")

        driver_path = os.path.join(tempfile.gettempdir(), DRIVER_EXE)
        self.root.after(0, lambda: self.status_var.set("正在下载 CH341 驱动..."))
        download_driver_package(driver_path)

        self.root.after(0, lambda: self.status_var.set("正在安装 CH341 驱动..."))
        subprocess.run([driver_path, "/S"], check=True)

    def load_file(self):
        path = filedialog.askopenfilename(filetypes=[("IC卡数据", "*.bin;*.dump"), ("所有文件", "*.*")])
        if not path:
            return

        with open(path, "rb") as f:
            self.file_data = f.read()
        self.file_path = path
        self.file_sector_keys = extract_sector_keys(self.file_data)

        if len(self.file_data) != 1024:
            messagebox.showwarning("警告", "文件大小不是 1024 字节。当前写入逻辑按 Mifare Classic 1K 处理。")

        self.file_label.config(text=os.path.basename(path), fg="black")
        self.update_file_preview()
        self.status_var.set("文件加载成功")

    def build_auth_candidates(self, block_index, cached_entry=None):
        candidates = []
        seen = set()
        sector_index = block_index // 4

        sector_keys = self.file_sector_keys.get(sector_index)
        if sector_keys:
            for key_type in ("A", "B"):
                key_hex = sector_keys.get(key_type)
                if not key_hex:
                    continue
                signature = (key_hex, key_type)
                if signature in seen:
                    continue
                seen.add(signature)
                candidates.append(
                    {
                        "key": key_hex,
                        "type": key_type,
                        "source": f"当前文件 sector {sector_index}",
                    }
                )

        if cached_entry is not None:
            signature = (cached_entry["key"], cached_entry["type"])
            if signature not in seen:
                seen.add(signature)
                candidates.append(cached_entry)

        for entry in self.key_library:
            signature = (entry["key"], entry["type"])
            if signature in seen:
                continue
            seen.add(signature)
            candidates.append(entry)
        return candidates

    def authenticate_block_with_library(self, ser, target_number, block_index, uid, cached_entry=None):
        candidates = self.build_auth_candidates(block_index, cached_entry)
        for entry in candidates:
            append_debug_log(
                f"AUTH TRY block={block_index} uid={uid.hex().upper()} type={entry['type']} "
                f"key={entry['key']} source={entry.get('source', '-')}"
            )
            if mifare_authenticate_once(
                ser,
                target_number,
                block_index,
                uid,
                entry["key"],
                entry["type"],
            ):
                append_debug_log(
                    f"AUTH OK block={block_index} uid={uid.hex().upper()} "
                    f"type={entry['type']} key={entry['key']} source={entry.get('source', '-')}"
                )
                return entry
        append_debug_log(
            f"AUTH FAIL block={block_index} uid={uid.hex().upper()} attempts={len(candidates)}"
        )
        raise RuntimeError(
            f"块 {block_index} 认证失败,UID={uid.hex().upper()},已尝试 {len(candidates)} 个密钥"
        )

    def start_write_thread(self):
        if not self.file_data:
            messagebox.showwarning("提示", "请先选择数据文件")
            return
        if not self.selected_port.get():
            messagebox.showwarning("提示", "请先选择设备端口")
            return

        self.btn_write.config(state="disabled")
        self.progress_var.set(0)
        threading.Thread(target=self.write_logic, daemon=True).start()

    def write_logic(self):
        port = self.selected_port.get()
        ser = None
        sector_key_cache = {}
        try:
            append_debug_log("========== WRITE SESSION START ==========")
            append_debug_log(f"PORT={port}")
            append_debug_log(f"FILE={self.file_path}")
            self.status_var.set("正在初始化 PN532...")
            ser = serial.Serial(port, BAUD_RATE, timeout=1, write_timeout=1)

            pn532_wakeup(ser)
            pn532_sam_configuration(ser)

            self.status_var.set("正在寻卡,请将 IC 卡放置在感应区...")
            target_number, uid = pn532_list_passive_target(ser)
            uid_hex = uid.hex().upper()
            append_debug_log(f"TARGET={target_number} UID={uid_hex} UID_LEN={len(uid)}")

            total_blocks = len(self.file_data) // 16
            writable_blocks = [
                block_index
                for block_index in range(total_blocks)
                if block_index != 0 and not is_sector_trailer(block_index)
            ]
            if not writable_blocks:
                raise RuntimeError("文件中没有可写入的数据块")

            for index, block_index in enumerate(writable_blocks, 1):
                sector_index = block_index // 4
                start = block_index * 16
                block_data = self.file_data[start : start + 16]
                if len(block_data) < 16:
                    block_data = block_data.ljust(16, b"\x00")

                self.status_var.set(
                    f"UID {uid_hex},正在认证并写入块 {block_index} / {writable_blocks[-1]}..."
                )
                used_key = self.authenticate_block_with_library(
                    ser,
                    target_number,
                    block_index,
                    uid,
                    cached_entry=sector_key_cache.get(sector_index),
                )
                sector_key_cache[sector_index] = used_key
                mifare_write_block(ser, target_number, block_index, block_data)
                append_debug_log(
                    f"WRITE OK block={block_index} sector={sector_index} "
                    f"key={used_key['key']} type={used_key['type']}"
                )
                self.progress_var.set(index / len(writable_blocks) * 100)

            self.status_var.set("写入成功")
            append_debug_log("WRITE SUCCESS")
            messagebox.showinfo(
                "完成",
                "数据写入成功。\n认证过程已自动遍历密钥库,并跳过控制块以避免写坏卡。",
            )
        except Exception as e:
            append_debug_log(f"WRITE ERROR {type(e).__name__}: {e}")
            self.status_var.set(f"错误: {e}")
            messagebox.showerror(
                "写入失败",
                f"写卡过程中出错:{e}\n\n调试日志:{DEBUG_LOG_FILE}",
            )
        finally:
            if ser is not None:
                try:
                    ser.close()
                except Exception:
                    pass
            append_debug_log("========== WRITE SESSION END ==========")
            self.btn_write.config(state="normal")


if __name__ == "__main__":
    root = tk.Tk()
    app = PCR532WriterApp(root)
    root.mainloop()
屏幕截图 2026-04-29 175740.png
屏幕截图 2026-04-29 180302.png
屏幕截图 2026-04-29 180343.png

免费评分

参与人数 1吾爱币 +7 热心值 +1 收起 理由
苏紫方璇 + 7 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

苏紫方璇 发表于 2026-4-30 16:43
代码插入请参考板块置顶帖
【公告】发帖代码插入以及添加链接教程(有福利)
https://www.52pojie.cn/thread-713042-1-1.html
(出处: 吾爱破解论坛)
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - 52pojie.cn ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2026-5-13 06:12

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表