[Python] 纯文本查看 复制代码
import json
import os
import time
import ctypes
import threading
import subprocess
import tempfile
import urllib.request
import tkinter as tk
from tkinter import filedialog, messagebox, ttk
import serial
import serial.tools.list_ports
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
KEY_LIBRARY_FILE = os.path.join(BASE_DIR, "key_library.json")
DEBUG_LOG_FILE = os.path.join(BASE_DIR, "write_debug.log")
DRIVER_EXE = "CH341SER.EXE"
DRIVER_URL = "https://www.wch.cn/downloads/file/65.html?time=2023-03-16%2022:57:59"
BAUD_RATE = 115200
PN532_PREAMBLE = bytes.fromhex("00 00 FF")
PN532_POSTAMBLE = b"\x00"
PN532_ACK = bytes.fromhex("00 00 FF 00 FF 00")
PN532_HOST_TFI = 0xD4
PN532_PN532_TFI = 0xD5
MIFARE_AUTH_A = 0x60
MIFARE_AUTH_B = 0x61
MIFARE_WRITE_16 = 0xA0
def is_admin():
try:
return ctypes.windll.shell32.IsUserAnAdmin()
except Exception:
return False
def list_serial_ports():
return list(serial.tools.list_ports.comports())
def has_ch34x_device():
try:
result = subprocess.run(
[
"powershell",
"-NoProfile",
"-Command",
(
"Get-PnpDevice -PresentOnly | "
"Where-Object { $_.FriendlyName -match 'CH340|CH341|USB-SERIAL CH340|USB Serial' } | "
"Select-Object -First 1 -ExpandProperty FriendlyName"
),
],
capture_output=True,
text=True,
timeout=8,
check=False,
)
return bool(result.stdout.strip())
except Exception:
return False
def download_driver_package(target_path):
request = urllib.request.Request(DRIVER_URL, headers={"User-Agent": "Mozilla/5.0"})
with urllib.request.urlopen(request, timeout=30) as response:
data = response.read()
with open(target_path, "wb") as f:
f.write(data)
def normalize_key_hex(text):
cleaned = "".join(ch for ch in text if ch in "0123456789abcdefABCDEF").upper()
if len(cleaned) != 12:
raise ValueError("密钥必须是 12 位十六进制,例如 FFFFFFFFFFFF")
return cleaned
def format_hex_preview(data, bytes_per_line=16):
lines = []
for offset in range(0, len(data), bytes_per_line):
chunk = data[offset : offset + bytes_per_line]
hex_part = " ".join(f"{byte:02X}" for byte in chunk)
lines.append(f"{offset:04X} {hex_part}")
return "\n".join(lines) if lines else "无数据"
def is_sector_trailer(block_index):
return (block_index + 1) % 4 == 0
def extract_keys_from_dump(data, source_name):
entries = []
total_blocks = len(data) // 16
for block_index in range(total_blocks):
if not is_sector_trailer(block_index):
continue
start = block_index * 16
trailer = data[start : start + 16]
if len(trailer) < 16:
continue
sector_index = block_index // 4
key_a = trailer[:6].hex().upper()
key_b = trailer[10:16].hex().upper()
entries.append(
{
"key": key_a,
"type": "A",
"source": f"{source_name} sector {sector_index} trailer",
}
)
entries.append(
{
"key": key_b,
"type": "B",
"source": f"{source_name} sector {sector_index} trailer",
}
)
return entries
def extract_sector_keys(data):
sector_keys = {}
total_blocks = len(data) // 16
for block_index in range(total_blocks):
if not is_sector_trailer(block_index):
continue
start = block_index * 16
trailer = data[start : start + 16]
if len(trailer) < 16:
continue
sector_keys[block_index // 4] = {
"A": trailer[:6].hex().upper(),
"B": trailer[10:16].hex().upper(),
}
return sector_keys
def unique_key_entries(entries):
unique = []
seen = set()
for entry in entries:
try:
key_hex = normalize_key_hex(entry["key"])
except Exception:
continue
key_type = entry.get("type", "A").upper()
if key_type not in {"A", "B"}:
continue
signature = (key_hex, key_type)
if signature in seen:
continue
seen.add(signature)
unique.append(
{
"key": key_hex,
"type": key_type,
"source": str(entry.get("source", "手动添加")),
}
)
return unique
def load_key_library(path):
default_entries = [{"key": "FFFFFFFFFFFF", "type": "A", "source": "默认密钥"}]
if not os.path.exists(path):
entries = unique_key_entries(default_entries)
save_key_library(path, entries)
return entries
try:
with open(path, "r", encoding="utf-8") as f:
payload = json.load(f)
except Exception:
entries = unique_key_entries(default_entries)
save_key_library(path, entries)
return entries
if isinstance(payload, dict):
raw_entries = payload.get("keys", [])
elif isinstance(payload, list):
raw_entries = payload
else:
raw_entries = []
entries = unique_key_entries(default_entries + raw_entries)
save_key_library(path, entries)
return entries
def save_key_library(path, entries):
with open(path, "w", encoding="utf-8") as f:
json.dump({"keys": unique_key_entries(entries)}, f, ensure_ascii=False, indent=2)
def build_frame(data):
length = len(data) + 1
lcs = (-length) & 0xFF
body = bytes([PN532_HOST_TFI]) + bytes(data)
dcs = (-sum(body)) & 0xFF
return PN532_PREAMBLE + bytes([length, lcs]) + body + bytes([dcs]) + PN532_POSTAMBLE
def read_exact(ser, size, timeout=2.0):
deadline = time.time() + timeout
buf = bytearray()
while len(buf) < size and time.time() < deadline:
chunk = ser.read(size - len(buf))
if chunk:
buf.extend(chunk)
else:
time.sleep(0.01)
if len(buf) != size:
raise TimeoutError(f"串口读取超时,期望 {size} 字节,实际 {len(buf)} 字节")
return bytes(buf)
def read_response_frame(ser):
ack = read_exact(ser, len(PN532_ACK))
if ack != PN532_ACK:
raise RuntimeError(f"PN532 ACK 无效: {ack.hex(' ').upper()}")
header = read_exact(ser, 5)
if header[:3] != PN532_PREAMBLE:
raise RuntimeError(f"PN532 帧头无效: {header.hex(' ').upper()}")
length = header[3]
lcs = header[4]
if ((length + lcs) & 0xFF) != 0:
raise RuntimeError("PN532 长度校验失败")
body = read_exact(ser, length + 2)
payload = body[:length]
dcs = body[length]
postamble = body[length + 1]
if ((sum(payload) + dcs) & 0xFF) != 0:
raise RuntimeError("PN532 数据校验失败")
if postamble != 0x00:
raise RuntimeError("PN532 帧尾无效")
if not payload or payload[0] != PN532_PN532_TFI:
raise RuntimeError("PN532 响应 TFI 无效")
return payload[1:]
def pn532_command(ser, data, expected_code=None):
ser.reset_input_buffer()
ser.write(build_frame(data))
response = read_response_frame(ser)
if expected_code is not None and (not response or response[0] != expected_code):
actual = response[0] if response else None
raise RuntimeError(f"PN532 响应命令不匹配,期望 {expected_code:02X},实际 {actual}")
return response
def pn532_wakeup(ser):
ser.write(bytes.fromhex("55 55 00 00 00 00 00 00 00 00 00 00 00 00 00 00 FF 03 FD D4 14 01 17 00"))
time.sleep(0.2)
try:
ser.reset_input_buffer()
except Exception:
pass
def pn532_sam_configuration(ser):
pn532_command(ser, [0x14, 0x01, 0x14, 0x01], expected_code=0x15)
def pn532_list_passive_target(ser):
response = pn532_command(ser, [0x4A, 0x01, 0x00], expected_code=0x4B)
if len(response) < 7 or response[1] < 1:
raise RuntimeError("未检测到 Mifare 卡")
uid_length_index = 6
uid_length = response[uid_length_index]
uid = bytes(response[uid_length_index + 1 : uid_length_index + 1 + uid_length])
if len(uid) < 4:
raise RuntimeError("卡 UID 长度异常")
return response[2], uid
def build_uid_auth_candidates(uid):
if len(uid) <= 4:
return [uid[:4]]
candidates = []
lower4 = uid[-4:]
upper4 = uid[:4]
if upper4 != lower4:
candidates.append(upper4)
return candidates
def mifare_authenticate_once(ser, target_number, block_index, uid, key_hex, key_type):
auth_cmd = MIFARE_AUTH_A if key_type == "A" else MIFARE_AUTH_B
for auth_uid in build_uid_auth_candidates(uid):
data = [0x40, target_number, auth_cmd, block_index]
data.extend(bytes.fromhex(key_hex))
data.extend(auth_uid)
response = pn532_command(ser, data, expected_code=0x41)
if len(response) >= 2 and response[1] == 0x00:
return True
return False
def mifare_write_block(ser, target_number, block_index, block_data):
if len(block_data) != 16:
raise ValueError("Mifare 单块必须是 16 字节")
data = [0x40, target_number, MIFARE_WRITE_16, block_index]
data.extend(block_data)
response = pn532_command(ser, data, expected_code=0x41)
if len(response) < 2 or response[1] != 0x00:
raise RuntimeError(f"写块失败,块 {block_index}")
def append_debug_log(line):
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
with open(DEBUG_LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"[{timestamp}] {line}\n")
class PCR532WriterApp:
def __init__(self, root):
self.root = root
self.root.title("基于PCR532 IC卡一键定点写入器 冯学长制作")
self.root.geometry("860x820")
self.root.minsize(780, 720)
self.file_data = None
self.file_path = ""
self.file_sector_keys = {}
self.selected_port = tk.StringVar()
self.status_var = tk.StringVar(value="准备就绪")
self.progress_var = tk.DoubleVar(value=0)
self.driver_check_running = False
self.key_library_window = None
self.key_tree = None
self.key_type_var = tk.StringVar(value="A")
self.key_count_var = tk.StringVar()
self.key_library = load_key_library(KEY_LIBRARY_FILE)
self.setup_ui()
self.refresh_key_count()
self.auto_check_driver()
def setup_ui(self):
header = tk.Frame(self.root, bg="#2c3e50", height=60)
header.pack(fill="x")
header.pack_propagate(False)
tk.Label(
header,
text="IC卡一键写入工具",
fg="white",
bg="#2c3e50",
font=("Microsoft YaHei UI", 16),
).pack(pady=15)
dev_frame = tk.LabelFrame(self.root, text="1. 设备与驱动检查", padx=10, pady=10)
dev_frame.pack(pady=10, padx=20, fill="x")
self.port_combo = ttk.Combobox(dev_frame, textvariable=self.selected_port, state="readonly")
self.port_combo.pack(side="left", expand=True, fill="x", padx=5)
tk.Button(dev_frame, text="刷新/安装驱动", command=self.auto_check_driver).pack(side="right")
file_frame = tk.LabelFrame(self.root, text="2. 选择写入文件 (.bin / .dump)", padx=10, pady=10)
file_frame.pack(pady=10, padx=20, fill="x")
self.file_label = tk.Label(file_frame, text="未选择文件", fg="gray", wraplength=540, justify="left")
self.file_label.pack(side="left", expand=True, anchor="w")
tk.Button(file_frame, text="浏览文件", command=self.load_file).pack(side="right")
key_frame = tk.LabelFrame(self.root, text="3. 密钥库与认证", padx=10, pady=10)
key_frame.pack(pady=10, padx=20, fill="x")
self.key_count_label = tk.Label(key_frame, textvariable=self.key_count_var, fg="#2c3e50")
self.key_count_label.pack(anchor="w", pady=(0, 8))
key_button_row = tk.Frame(key_frame)
key_button_row.pack(fill="x")
tk.Button(key_button_row, text="管理密钥库", command=self.open_key_library_window).pack(side="left", padx=(0, 8))
tk.Button(key_button_row, text="提取当前文件密钥到库", command=self.extract_keys_from_current_file).pack(side="left")
exec_frame = tk.LabelFrame(self.root, text="4. 写卡操作", padx=10, pady=10)
exec_frame.pack(pady=10, padx=20, fill="x")
self.btn_write = tk.Button(
exec_frame,
text="开始写卡",
bg="#27ae60",
fg="white",
font=("Microsoft YaHei UI", 12, "bold"),
height=2,
command=self.start_write_thread,
)
self.btn_write.pack(fill="x")
self.progress_bar = ttk.Progressbar(self.root, variable=self.progress_var, maximum=100)
self.progress_bar.pack(fill="x", padx=20, pady=5)
self.lbl_status = tk.Label(self.root, textvariable=self.status_var, fg="#2980b9")
self.lbl_status.pack(pady=5)
info_frame = tk.LabelFrame(self.root, text="5. 文件数据预览(HEX)", padx=10, pady=10)
info_frame.pack(pady=10, padx=20, fill="both", expand=True)
self.file_meta_label = tk.Label(
info_frame,
text="长度: -\n路径: -",
anchor="w",
justify="left",
fg="#2c3e50",
)
self.file_meta_label.pack(fill="x", pady=(0, 8))
text_holder = tk.Frame(info_frame)
text_holder.pack(fill="both", expand=True)
y_scroll = ttk.Scrollbar(text_holder, orient="vertical")
y_scroll.pack(side="right", fill="y")
x_scroll = ttk.Scrollbar(info_frame, orient="horizontal")
x_scroll.pack(fill="x")
self.preview_text = tk.Text(
text_holder,
height=14,
wrap="none",
font=("Consolas", 10),
state="disabled",
bg="#f7f9fb",
yscrollcommand=y_scroll.set,
xscrollcommand=x_scroll.set,
)
self.preview_text.pack(side="left", fill="both", expand=True)
y_scroll.config(command=self.preview_text.yview)
x_scroll.config(command=self.preview_text.xview)
tk.Label(
self.root,
text="说明:写入时会跳过 block 0 和每个扇区 trailer 块;认证时会自动遍历密钥库。",
fg="#7f8c8d",
).pack(pady=(0, 8))
def refresh_key_count(self):
self.key_count_var.set(
f"当前密钥数:{len(self.key_library)} 个,库文件:{os.path.basename(KEY_LIBRARY_FILE)}"
)
if self.key_tree is not None:
self.refresh_key_tree()
def save_key_library(self):
save_key_library(KEY_LIBRARY_FILE, self.key_library)
self.key_library = load_key_library(KEY_LIBRARY_FILE)
self.refresh_key_count()
def add_key_entries(self, entries):
before = len(self.key_library)
self.key_library = unique_key_entries(self.key_library + entries)
self.save_key_library()
return len(self.key_library) - before
def open_key_library_window(self):
if self.key_library_window is not None and self.key_library_window.winfo_exists():
self.key_library_window.focus()
return
window = tk.Toplevel(self.root)
window.title("密钥库")
window.geometry("760x460")
window.transient(self.root)
self.key_library_window = window
tree = ttk.Treeview(window, columns=("type", "key", "source"), show="headings", height=12)
tree.heading("type", text="类型")
tree.heading("key", text="密钥")
tree.heading("source", text="来源")
tree.column("type", width=80, anchor="center")
tree.column("key", width=160, anchor="center")
tree.column("source", width=460, anchor="w")
tree.pack(fill="both", expand=True, padx=12, pady=(12, 8))
self.key_tree = tree
self.refresh_key_tree()
form = tk.LabelFrame(window, text="手动添加密钥", padx=10, pady=10)
form.pack(fill="x", padx=12, pady=(0, 8))
tk.Label(form, text="密钥").pack(side="left")
self.key_entry_widget = tk.Entry(form, width=22)
self.key_entry_widget.pack(side="left", padx=(8, 8))
tk.Label(form, text="类型").pack(side="left")
key_type_combo = ttk.Combobox(form, width=8, state="readonly", textvariable=self.key_type_var)
key_type_combo["values"] = ("A", "B", "Both")
key_type_combo.pack(side="left", padx=(8, 8))
key_type_combo.current(0)
tk.Button(form, text="添加到库", command=self.add_manual_key).pack(side="left")
action_row = tk.Frame(window)
action_row.pack(fill="x", padx=12, pady=(0, 12))
tk.Button(action_row, text="删除选中密钥", command=self.delete_selected_keys).pack(side="left")
tk.Button(action_row, text="关闭", command=window.destroy).pack(side="right")
def refresh_key_tree(self):
if self.key_tree is None or not self.key_tree.winfo_exists():
return
self.key_tree.delete(*self.key_tree.get_children())
for entry in self.key_library:
iid = f"{entry['type']}:{entry['key']}"
self.key_tree.insert("", "end", iid=iid, values=(entry["type"], entry["key"], entry["source"]))
def add_manual_key(self):
raw_key = self.key_entry_widget.get().strip()
try:
key_hex = normalize_key_hex(raw_key)
except ValueError as e:
messagebox.showwarning("格式错误", str(e))
return
selected_type = self.key_type_var.get().strip() or "A"
entries = []
if selected_type == "Both":
entries.append({"key": key_hex, "type": "A", "source": "手动添加"})
entries.append({"key": key_hex, "type": "B", "source": "手动添加"})
else:
entries.append({"key": key_hex, "type": selected_type, "source": "手动添加"})
added = self.add_key_entries(entries)
self.key_entry_widget.delete(0, tk.END)
if added:
messagebox.showinfo("密钥库", f"已新增 {added} 个密钥")
else:
messagebox.showinfo("密钥库", "密钥已存在,无新增内容")
def delete_selected_keys(self):
if self.key_tree is None:
return
selected = self.key_tree.selection()
if not selected:
messagebox.showwarning("提示", "请先选择要删除的密钥")
return
selected_pairs = set()
for item_id in selected:
values = self.key_tree.item(item_id, "values")
if len(values) >= 2:
selected_pairs.add((values[1], values[0]))
self.key_library = [
entry
for entry in self.key_library
if (entry["key"], entry["type"]) not in selected_pairs
]
self.save_key_library()
def extract_keys_from_current_file(self):
if not self.file_data:
messagebox.showwarning("提示", "请先加载一个 .bin 或 .dump 文件")
return
source_name = os.path.basename(self.file_path) or "当前文件"
extracted = extract_keys_from_dump(self.file_data, source_name)
added = self.add_key_entries(extracted)
messagebox.showinfo(
"提取完成",
f"已从 {source_name} 的扇区 trailer 中提取 {len(extracted)} 个候选密钥。\n"
f"新增入库 {added} 个唯一密钥。",
)
def set_ports(self, port_list):
self.port_combo["values"] = port_list
if port_list:
self.port_combo.current(0)
def set_preview_text(self, content):
self.preview_text.config(state="normal")
self.preview_text.delete("1.0", tk.END)
self.preview_text.insert("1.0", content)
self.preview_text.config(state="disabled")
def update_file_preview(self):
if self.file_data is None:
self.file_meta_label.config(text="长度: -\n路径: -")
self.set_preview_text("无数据")
return
self.file_meta_label.config(text=f"长度: {len(self.file_data)} 字节\n路径: {self.file_path}")
self.set_preview_text(format_hex_preview(self.file_data))
def auto_check_driver(self):
if self.driver_check_running:
return
self.driver_check_running = True
self.status_var.set("正在检查驱动和串口...")
threading.Thread(target=self._auto_check_driver_worker, daemon=True).start()
def _auto_check_driver_worker(self):
try:
ports = list_serial_ports()
if ports:
port_list = [p.device for p in ports]
self.root.after(0, lambda: self.set_ports(port_list))
self.root.after(0, lambda: self.status_var.set(f"已发现 {len(port_list)} 个可用串口"))
return
if has_ch34x_device():
self.root.after(0, lambda: self.status_var.set("已检测到 CH340/CH341 设备,请重新插拔后点击刷新"))
return
self.root.after(0, lambda: self.status_var.set("未检测到驱动,正在联网下载安装..."))
self.install_driver_online()
refreshed_ports = list_serial_ports()
refreshed_list = [p.device for p in refreshed_ports]
self.root.after(0, lambda: self.set_ports(refreshed_list))
if refreshed_list:
self.root.after(0, lambda: self.status_var.set("驱动安装完成,串口已刷新"))
else:
self.root.after(0, lambda: self.status_var.set("驱动安装完成,请重新插拔设备后点击刷新"))
except PermissionError as e:
self.root.after(0, lambda: self.status_var.set(str(e)))
self.root.after(0, lambda: messagebox.showwarning("权限不足", str(e)))
except Exception as e:
self.root.after(0, lambda: self.status_var.set(f"驱动安装失败: {e}"))
self.root.after(0, lambda: messagebox.showerror("驱动安装失败", f"联网安装驱动失败:{e}"))
finally:
self.driver_check_running = False
def install_driver_online(self):
if not is_admin():
raise PermissionError("请以管理员身份运行程序,才能自动安装驱动")
driver_path = os.path.join(tempfile.gettempdir(), DRIVER_EXE)
self.root.after(0, lambda: self.status_var.set("正在下载 CH341 驱动..."))
download_driver_package(driver_path)
self.root.after(0, lambda: self.status_var.set("正在安装 CH341 驱动..."))
subprocess.run([driver_path, "/S"], check=True)
def load_file(self):
path = filedialog.askopenfilename(filetypes=[("IC卡数据", "*.bin;*.dump"), ("所有文件", "*.*")])
if not path:
return
with open(path, "rb") as f:
self.file_data = f.read()
self.file_path = path
self.file_sector_keys = extract_sector_keys(self.file_data)
if len(self.file_data) != 1024:
messagebox.showwarning("警告", "文件大小不是 1024 字节。当前写入逻辑按 Mifare Classic 1K 处理。")
self.file_label.config(text=os.path.basename(path), fg="black")
self.update_file_preview()
self.status_var.set("文件加载成功")
def build_auth_candidates(self, block_index, cached_entry=None):
candidates = []
seen = set()
sector_index = block_index // 4
sector_keys = self.file_sector_keys.get(sector_index)
if sector_keys:
for key_type in ("A", "B"):
key_hex = sector_keys.get(key_type)
if not key_hex:
continue
signature = (key_hex, key_type)
if signature in seen:
continue
seen.add(signature)
candidates.append(
{
"key": key_hex,
"type": key_type,
"source": f"当前文件 sector {sector_index}",
}
)
if cached_entry is not None:
signature = (cached_entry["key"], cached_entry["type"])
if signature not in seen:
seen.add(signature)
candidates.append(cached_entry)
for entry in self.key_library:
signature = (entry["key"], entry["type"])
if signature in seen:
continue
seen.add(signature)
candidates.append(entry)
return candidates
def authenticate_block_with_library(self, ser, target_number, block_index, uid, cached_entry=None):
candidates = self.build_auth_candidates(block_index, cached_entry)
for entry in candidates:
append_debug_log(
f"AUTH TRY block={block_index} uid={uid.hex().upper()} type={entry['type']} "
f"key={entry['key']} source={entry.get('source', '-')}"
)
if mifare_authenticate_once(
ser,
target_number,
block_index,
uid,
entry["key"],
entry["type"],
):
append_debug_log(
f"AUTH OK block={block_index} uid={uid.hex().upper()} "
f"type={entry['type']} key={entry['key']} source={entry.get('source', '-')}"
)
return entry
append_debug_log(
f"AUTH FAIL block={block_index} uid={uid.hex().upper()} attempts={len(candidates)}"
)
raise RuntimeError(
f"块 {block_index} 认证失败,UID={uid.hex().upper()},已尝试 {len(candidates)} 个密钥"
)
def start_write_thread(self):
if not self.file_data:
messagebox.showwarning("提示", "请先选择数据文件")
return
if not self.selected_port.get():
messagebox.showwarning("提示", "请先选择设备端口")
return
self.btn_write.config(state="disabled")
self.progress_var.set(0)
threading.Thread(target=self.write_logic, daemon=True).start()
def write_logic(self):
port = self.selected_port.get()
ser = None
sector_key_cache = {}
try:
append_debug_log("========== WRITE SESSION START ==========")
append_debug_log(f"PORT={port}")
append_debug_log(f"FILE={self.file_path}")
self.status_var.set("正在初始化 PN532...")
ser = serial.Serial(port, BAUD_RATE, timeout=1, write_timeout=1)
pn532_wakeup(ser)
pn532_sam_configuration(ser)
self.status_var.set("正在寻卡,请将 IC 卡放置在感应区...")
target_number, uid = pn532_list_passive_target(ser)
uid_hex = uid.hex().upper()
append_debug_log(f"TARGET={target_number} UID={uid_hex} UID_LEN={len(uid)}")
total_blocks = len(self.file_data) // 16
writable_blocks = [
block_index
for block_index in range(total_blocks)
if block_index != 0 and not is_sector_trailer(block_index)
]
if not writable_blocks:
raise RuntimeError("文件中没有可写入的数据块")
for index, block_index in enumerate(writable_blocks, 1):
sector_index = block_index // 4
start = block_index * 16
block_data = self.file_data[start : start + 16]
if len(block_data) < 16:
block_data = block_data.ljust(16, b"\x00")
self.status_var.set(
f"UID {uid_hex},正在认证并写入块 {block_index} / {writable_blocks[-1]}..."
)
used_key = self.authenticate_block_with_library(
ser,
target_number,
block_index,
uid,
cached_entry=sector_key_cache.get(sector_index),
)
sector_key_cache[sector_index] = used_key
mifare_write_block(ser, target_number, block_index, block_data)
append_debug_log(
f"WRITE OK block={block_index} sector={sector_index} "
f"key={used_key['key']} type={used_key['type']}"
)
self.progress_var.set(index / len(writable_blocks) * 100)
self.status_var.set("写入成功")
append_debug_log("WRITE SUCCESS")
messagebox.showinfo(
"完成",
"数据写入成功。\n认证过程已自动遍历密钥库,并跳过控制块以避免写坏卡。",
)
except Exception as e:
append_debug_log(f"WRITE ERROR {type(e).__name__}: {e}")
self.status_var.set(f"错误: {e}")
messagebox.showerror(
"写入失败",
f"写卡过程中出错:{e}\n\n调试日志:{DEBUG_LOG_FILE}",
)
finally:
if ser is not None:
try:
ser.close()
except Exception:
pass
append_debug_log("========== WRITE SESSION END ==========")
self.btn_write.config(state="normal")
if __name__ == "__main__":
root = tk.Tk()
app = PCR532WriterApp(root)
root.mainloop()