本帖最后由 鼠八爷 于 2025-4-16 01:47 编辑
前言。
我有一个42166594行的数据,用电脑记事本打不开(太慢了,而且导入软件就会卡死)。想着找个在线的分割,结果上传上去数据就是空白。来论坛逛了一圈下软件,发现也是卡死。
后来找到了大佬的帖子,TXT批量文本分割器Python - 吾爱破解 - 52pojie.cn
然后在评论上看到另一个大佬的优化版本
只不过他的是,文本的大小来保存的,用于数据的话会分割不完整,于是我就在大佬的基础上加以修改,变成了用行数来分割,
打包成程序了,https://wwen.lanzout.com/ir4dr2tpe0zg
[Python] 纯文本查看 复制代码 import os
from math import ceil
SPLIT_HEADER = r"""
============== 文本文件行数分割工具 ==============
"""
def wait_exit():
"""等待用户按键退出"""
input("\n按任意键退出程序...")
def get_valid_input(prompt, validation_func, error_msg):
while True:
user_input = input(prompt).strip()
if validation_func(user_input):
return user_input
print(f"{error_msg}")
def validate_directory(path):
return os.path.isdir(path)
def validate_line_number(num_str):
try:
return int(num_str) > 0
except ValueError:
return False
def select_files(directory, recursive):
file_list = []
target_ext = '.txt'
if recursive:
for root, _, files in os.walk(directory):
for f in files:
if f.endswith(target_ext):
file_list.append(os.path.join(root, f))
else:
file_list = [os.path.join(directory, f)
for f in os.listdir(directory)
if f.endswith(target_ext) and os.path.isfile(os.path.join(directory, f))]
return file_list
def split_file_by_lines(file_path, max_lines, encoding='utf-8'):
try:
file_dir, file_name = os.path.split(file_path)
file_base_name, file_ext = os.path.splitext(file_name)
split_dir = os.path.join(file_dir, f"{file_base_name}_split")
os.makedirs(split_dir, exist_ok=True)
print(f"\n正在处理: {file_name}")
with open(file_path, 'r', encoding=encoding) as src:
file_index = 0
line_buffer = []
total_lines = 0
print("正在统计总行数...", end='', flush=True)
total_lines = sum(1 for _ in src)
src.seek(0)
print(f"\r总行数统计完成: {total_lines} 行")
current_line = 0
for line in src:
line_buffer.append(line)
current_line += 1
if len(line_buffer) >= max_lines or current_line == total_lines:
split_name = f"{file_base_name}_part{file_index:04d}{file_ext}"
split_path = os.path.join(split_dir, split_name)
with open(split_path, 'w', encoding=encoding) as dst:
dst.writelines(line_buffer)
progress = current_line / total_lines * 100
print(f"\r进度: {progress:.1f}% | 已生成: {split_name}", end='')
line_buffer = []
file_index += 1
print(f"\n完成拆分:共 {file_index} 个文件")
return True
except UnicodeDecodeError:
print(f"\n编码错误:请尝试其他编码格式(如gbk)")
return False
except Exception as e:
print(f"\n处理失败:{str(e)}")
return False
def main():
print(SPLIT_HEADER)
try:
directory = get_valid_input(
"1. 请输入要处理的目录路径:",
validate_directory,
"目录不存在,请重新输入"
)
recursive = input("2. 是否递归子目录?(y/n): ").lower() == 'y'
max_lines = int(get_valid_input(
"3. 请输入每个文件的最大行数:",
validate_line_number,
"请输入有效的正整数"
))
encoding = input("4. 请输入文件编码(默认utf-8,回车就行):").strip() or 'utf-8'
files = select_files(directory, recursive)
if not files:
print("没有找到符合条件的txt文件")
wait_exit()
return
print("\n找到以下待处理文件:")
for i, f in enumerate(files, 1):
print(f"{i}. {os.path.basename(f)}")
confirm = input("\n是否开始处理?(y/n): ").lower()
if confirm != 'y':
print("操作已取消")
wait_exit()
return
success_count = 0
for file_path in files:
if split_file_by_lines(file_path, max_lines, encoding):
success_count += 1
print(f"\n处理完成:共 {len(files)} 个文件,成功 {success_count} 个")
wait_exit()
except KeyboardInterrupt:
print("\n操作已中断")
wait_exit()
if __name__ == "__main__":
main()
|