import os
from PIL import Image
from concurrent.futures import ThreadPoolExecutor
import multiprocessing
def is_image_file(file):
try
:
with Image.open(file) as img:
return
img.format in [
'JPEG'
,
'PNG'
,
'BMP'
,
'GIF'
,
'TIFF'
]
except IOError:
return
False
def compress_image(input_file, output_file):
# 打开图像文件
with Image.open(input_file) as img:
# 获取图像的格式
file_format = img.format
try
:
# 保存压缩后的图像
img.save(output_file, format=file_format, optimize=True)
except OSError:
# 如果遇到缺少 EXIF 信息的情况,重新保存图像
img.save(output_file, format=file_format, optimize=True, exif=b
''
)
print(f
"图像已成功压缩: {output_file}"
)
def compress_images_in_folders(thread_count):
# 获取用户输入的文件夹路径
folder_path = input(
"请输入要压缩图像文件所在的文件夹路径:"
)
# 遍历文件夹,找到所有图像文件
image_files = []
for
root, _, files in os.walk(folder_path):
for
file in files:
input_file = os.path.join(root, file)
# 检查文件扩展名
_, ext = os.path.splitext(input_file)
if
ext.lower() in [
'.jpg'
,
'.jpeg'
,
'.png'
,
'.bmp'
,
'.gif'
,
'.tiff'
]:
if
is_image_file(input_file):
image_files.append(input_file)
# 使用线程池进行图像压缩
with ThreadPoolExecutor(max_workers=thread_count) as executor:
for
input_file in image_files:
# 生成压缩后的文件名
output_file = os.path.splitext(input_file)[0] +
"_compressed"
+ os.path.splitext(input_file)[1]
# 检查是否已存在同名文件
if
os.path.exists(output_file):
i = 1
while
os.path.exists(output_file):
output_file = os.path.splitext(input_file)[0] + f
"_compressed_{i}"
+ os.path.splitext(input_file)[1]
i += 1
executor.submit(compress_image, input_file, output_file)
if
__name__ ==
"__main__"
:
thread_count = multiprocessing.cpu_count()
print(f
"使用 {thread_count} 个线程进行图像压缩"
)
compress_images_in_folders(thread_count)