import
os
import
tkinter as tk
from
tkinter
import
messagebox, filedialog
from
tkinter
import
ttk
import
threading
import
queue
import
traceback
from
typing
import
Optional,
Tuple
,
List
class
WxAutoExIm:
HEADER_FORMATS
=
{
1
: (
0x89
,
0x50
,
0x4E
,
'.png'
),
2
: (
0xFF
,
0xD8
,
0xFF
,
'.jpg'
),
3
: (
0x47
,
0x49
,
0x46
,
'.gif'
)
}
def
__init__(
self
, input_path:
str
, output_path:
str
, status_queue: queue.Queue):
self
.input_path
=
input_path
self
.output_path
=
output_path
self
.status_queue
=
status_queue
self
._create_directory(output_path)
self
.specific_folder
=
self
._detect_special_folder(input_path)
self
._log_init_status()
self
.converted_files
=
[]
self
._process_files()
def
_detect_special_folder(
self
, path:
str
)
-
>
bool
:
return
os.path.basename(path).count(
'-'
) >
0
def
_log_init_status(
self
):
if
self
.specific_folder:
msg
=
f
'检测到特定月份文件夹: {os.path.basename(self.input_path)},将只解密该文件夹内图片\n'
else
:
msg
=
'将解密整个图片库下的所有图片\n'
self
.status_queue.put(msg)
def
_create_directory(
self
, path:
str
):
try
:
os.makedirs(path, exist_ok
=
True
)
except
OSError as e:
self
.status_queue.put(f
"创建目录失败: {str(e)}\n"
)
raise
def
_process_files(
self
):
if
self
.specific_folder:
self
._process_single_folder()
else
:
self
._process_all_folders()
def
_process_single_folder(
self
):
folder_name
=
os.path.basename(
self
.input_path)
self
.status_queue.put(f
'正在处理文件夹: {folder_name}\n'
)
self
._batch_convert(
self
.input_path, folder_name)
def
_process_all_folders(
self
):
for
root, dirs, files
in
os.walk(
self
.input_path):
if
os.path.basename(root).count(
'-'
) >
0
:
folder_name
=
os.path.basename(root)
self
.status_queue.put(f
'处理文件夹: {folder_name}\n'
)
self
._batch_convert(root, folder_name)
def
_batch_convert(
self
, dir_path:
str
, prefix:
str
):
dat_files
=
self
._get_valid_files(dir_path)
if
not
dat_files:
self
.status_queue.put(
"未找到有效的.dat文件\n"
)
return
total
=
len
(dat_files)
for
idx, dat_file
in
enumerate
(dat_files,
1
):
self
._convert_single_file(dir_path, dat_file, prefix)
self
._update_progress(idx, total)
def
_convert_single_file(
self
, dir_path:
str
, dat_file:
str
, prefix:
str
):
try
:
src_path
=
os.path.join(dir_path, dat_file)
file_name
=
f
"{prefix}_{os.path.splitext(dat_file)[0]}"
self
._image_decode(src_path, file_name,
self
.output_path)
self
.converted_files.append(dat_file)
except
Exception as e:
self
.status_queue.put(f
"文件{dat_file}转换失败: {str(e)}\n"
)
def
_image_decode(
self
, src_path:
str
, dest_name:
str
, out_dir:
str
):
try
:
with
open
(src_path,
"rb"
) as f_src:
header
=
f_src.read(
3
)
xor_value, extension
=
self
._analyze_header(header)
dest_path
=
os.path.join(out_dir, f
"{dest_name}{extension}"
)
with
open
(dest_path,
"wb"
) as f_dest:
f_src.seek(
0
)
while
chunk :
=
f_src.read(
4096
):
f_dest.write(bytes(b ^ xor_value
for
b
in
chunk))
except
IOError as e:
self
.status_queue.put(f
"文件操作失败: {str(e)}\n"
)
raise
def
_analyze_header(
self
, header: bytes)
-
>
Tuple
[
int
,
str
]:
for
fmt_id, (h1, h2, h3, ext)
in
self
.HEADER_FORMATS.items():
xor_values
=
[header[
0
] ^ h1, header[
1
] ^ h2, header[
2
] ^ h3]
if
len
(
set
(xor_values))
=
=
1
:
return
xor_values[
0
], ext
return
0x00
,
'.dat'
def
_get_valid_files(
self
, dir_path:
str
)
-
>
List
[
str
]:
return
[f
for
f
in
os.listdir(dir_path)
if
f.endswith(
'.dat'
)
and
os.path.isfile(os.path.join(dir_path, f))]
def
_update_progress(
self
, current:
int
, total:
int
):
progress
=
int
((current
/
total)
*
100
)
self
.status_queue.put(f
'转换进度: {progress}%\n'
)
class
RootTk:
def
__init__(
self
, title:
str
):
self
.root
=
tk.Tk()
self
.root.title(title)
self
.root.geometry(
'720x450'
)
self
.root.resizable(
False
,
False
)
self
.status_queue
=
queue.Queue()
self
._init_ui()
self
._setup_queue_handler()
def
_init_ui(
self
):
input_frame
=
ttk.Frame(
self
.root, padding
=
10
)
input_frame.pack(fill
=
tk.X)
ttk.Label(input_frame, text
=
"微信图片路径:"
).pack(side
=
tk.LEFT)
self
.input_path
=
tk.StringVar()
ttk.Entry(input_frame, textvariable
=
self
.input_path, width
=
50
).pack(side
=
tk.LEFT, padx
=
5
)
ttk.Button(input_frame, text
=
"选择文件夹"
, command
=
self
._select_input_folder).pack(side
=
tk.LEFT)
output_frame
=
ttk.Frame(
self
.root, padding
=
10
)
output_frame.pack(fill
=
tk.X)
ttk.Label(output_frame, text
=
"保存路径:"
).pack(side
=
tk.LEFT)
self
.output_path
=
tk.StringVar()
ttk.Entry(output_frame, textvariable
=
self
.output_path, width
=
50
).pack(side
=
tk.LEFT, padx
=
5
)
ttk.Button(output_frame, text
=
"选择文件夹"
, command
=
self
._select_output_folder).pack(side
=
tk.LEFT)
btn_frame
=
ttk.Frame(
self
.root, padding
=
10
)
btn_frame.pack()
ttk.Button(btn_frame, text
=
"开始转换"
, command
=
self
._execute_task).pack(pady
=
10
)
status_frame
=
ttk.Frame(
self
.root)
status_frame.pack(fill
=
tk.BOTH, expand
=
True
, padx
=
10
, pady
=
5
)
self
.status_text
=
tk.Text(status_frame, wrap
=
tk.WORD, state
=
'normal'
)
scrollbar
=
ttk.Scrollbar(status_frame, command
=
self
.status_text.yview)
self
.status_text.configure(yscrollcommand
=
scrollbar.
set
)
self
.status_text.pack(side
=
tk.LEFT, fill
=
tk.BOTH, expand
=
True
)
scrollbar.pack(side
=
tk.RIGHT, fill
=
tk.Y)
def
_setup_queue_handler(
self
):
self
.root.after(
100
,
self
._process_queue)
def
_process_queue(
self
):
while
not
self
.status_queue.empty():
msg
=
self
.status_queue.get()
self
._safe_update_status(msg)
self
.root.after(
100
,
self
._process_queue)
def
_safe_update_status(
self
, message:
str
):
self
.status_text.configure(state
=
'normal'
)
self
.status_text.insert(tk.END, message)
self
.status_text.see(tk.END)
self
.status_text.configure(state
=
'disabled'
)
def
_select_input_folder(
self
):
if
path :
=
filedialog.askdirectory():
self
.input_path.
set
(path)
def
_select_output_folder(
self
):
if
path :
=
filedialog.askdirectory():
self
.output_path.
set
(path)
def
_execute_task(
self
):
if
not
self
._validate_paths():
return
self
._safe_update_status(
'开始转换任务...\n'
)
threading.Thread(
target
=
self
._execute_conversion,
daemon
=
True
).start()
def
_validate_paths(
self
)
-
>
bool
:
input_path
=
self
.input_path.get()
output_path
=
self
.output_path.get()
if
not
input_path
or
not
output_path:
messagebox.showwarning(
"路径错误"
,
"请先选择输入和输出路径"
)
return
False
if
not
os.path.isdir(input_path):
messagebox.showerror(
"路径错误"
,
"输入路径无效或不存在"
)
return
False
if
os.path.commonpath([input_path])
=
=
os.path.commonpath([output_path]):
messagebox.showerror(
"路径错误"
,
"输入和输出路径不能相同"
)
return
False
return
True
def
_execute_conversion(
self
):
try
:
WxAutoExIm(
self
.input_path.get(),
self
.output_path.get(),
self
.status_queue
)
self
.status_queue.put(
"转换任务完成!\n"
)
messagebox.showinfo(
"完成"
,
"图片转换任务已成功完成"
)
except
Exception as e:
error_msg
=
f
"错误发生: {traceback.format_exc()}\n"
self
.status_queue.put(error_msg)
messagebox.showerror(
"错误"
,
"转换过程中发生严重错误"
)
def
run(
self
):
self
.root.mainloop()
if
__name__
=
=
'__main__'
:
app
=
RootTk(
"微信图片解密工具 v2.0"
)
app.run()