def unzip_kbf(input_files=None):
global tp_kbf_file_name
if input_files is None:
if global_constant.full_update or global_constant.sdcard_root_path is None:
input_file_abs_path = os.path.join(tp_path.parent.parent, 'f_input/')
else:
input_file_abs_path = os.path.join(global_constant.sdcard_root_path, '.mymoney', 'backup/')
else:
if global_constant.full_update or global_constant.sdcard_root_path is None:
input_file_abs_path = os.path.join(tp_path.parent.parent, 'f_input/', input_files)
else:
input_file_abs_path = os.path.join(global_constant.sdcard_root_path, '.mymoney', 'backup/', input_files)
if not os.path.exists(input_file_abs_path):
print(f'{input_file_abs_path}不存在')
raise FileNotFoundError(f'输入文件kbf不存在-->{input_file_abs_path}')
print(f'输入待解压kbf文件路径:{input_file_abs_path}')
if os.path.isdir(input_file_abs_path):
files = os.listdir(input_file_abs_path)
for f in files:
if f.endswith('.kbf'):
global tp_kbf_file_name
tp_kbf_file_name = f
zf = zipfile.ZipFile(os.path.join(input_file_abs_path, f))
zf.extractall(path=os.path.join(tp_path.parent.parent, 'f_input/'))
break
elif os.path.isfile(input_file_abs_path):
if input_files.lower().endswith('.kbf'):
tp_kbf_file_name = input_files
zf = zipfile.ZipFile(input_file_abs_path)
zf.extractall(path=os.path.join(tp_path.parent.parent, 'f_input/'))
pass
def ssj_kbf_sqlite_convert(input_file=None, output_file=None, convert=ConvertType.Exchanged):
"""
convert ssj data, after kbf unzip to sqlite,convert it to normal sqlite database file
:param convert: 0 means convert it auto, 1 kbf format to sqlite, 2 sqlite to kbf format
:param input_file: the mymoney.sqlite file path
:param output_file: the convert mymoney.sqlite file path
:return:
"""
if input_file is None:
input_file = os.path.join(tp_path.parent.parent, 'f_input/mymoney.sqlite')
if output_file is None:
output_file = os.path.join(tp_path.parent.parent, 'f_output/mymoney.sqlite')
sqlite_header = (0x53, 0x51, 0x4C, 0x69,
0x74, 0x65, 0x20, 0x66,
0x6F, 0x72, 0x6D, 0x61,
0x74, 0x20, 0x33, 0x0)
kbf_header = (0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0,
0x0, 0x46, 0xFF, 0x0)
if global_constant.print_repeat_data_info:
read_file_header(input_file)
if os.path.exists(output_file):
os.remove(output_file)
with open(input_file, mode='rb') as f:
with open(output_file, mode='wb') as fw:
data_buffer = f.read()
if data_buffer[0] == 0x53:
kbf2sqlite = False
print("当前为SQLite文件格式")
if data_buffer[0] == 0x00:
kbf2sqlite = True
print("当前为KBF文件格式")
write_buffer = bytearray(data_buffer)
index = 0
while index < len(kbf_header) and index < len(sqlite_header):
if convert == ConvertType.Exchanged:
if kbf2sqlite is True:
write_buffer[index] = sqlite_header[index]
else:
write_buffer[index] = kbf_header[index]
elif convert == ConvertType.SqliteToKbf and not kbf2sqlite:
write_buffer[index] = kbf_header[index]
elif convert == ConvertType.KbfToSqlite and kbf2sqlite:
write_buffer[index] = sqlite_header[index]
index = index + 1
fw.write(write_buffer)
pass
if global_constant.print_repeat_data_info:
read_file_header(output_file)
pass