[Python] 纯文本查看 复制代码
#!/usr/bin/env python3
"""
中文语音转写工具 - 基于 mlx-whisper(Apple Silicon 优化)
用法:python transcribe.py <音频文件> [选项]
"""
import argparse
import subprocess
import sys
import os
import json
import tempfile
import shutil
from pathlib import Path
import opencc
converter = opencc.OpenCC('t2s') # 繁体 → 简体
def split_audio(input_path: str, chunk_minutes: int = 25) -> list[str]:
"""将大文件切分为小段,避免内存压力(1GB 文件必须切分)"""
tmpdir = tempfile.mkdtemp(prefix="whisper_chunks_")
output_pattern = os.path.join(tmpdir, "chunk_%03d.mp3")
cmd = [
"ffmpeg", "-i", input_path,
"-f", "segment",
"-segment_time", str(chunk_minutes * 60),
"-c", "copy", # 无损切分,速度极快
"-reset_timestamps", "1",
output_pattern, "-y", "-loglevel", "error"
]
subprocess.run(cmd, check=True)
chunks = sorted(Path(tmpdir).glob("chunk_*.mp3"))
return [str(c) for c in chunks], tmpdir
def transcribe_file(audio_path: str, model: str, language: str = "zh") -> dict:
"""调用 mlx-whisper 转写单个文件"""
import mlx_whisper
result = mlx_whisper.transcribe(
audio_path,
path_or_hf_repo=f"mlx-community/whisper-{model}-mlx",
language=language,
word_timestamps=True, # 词级时间戳
condition_on_previous_text=True,
verbose=False,
)
return result
def format_srt(segments: list, offset_seconds: float = 0.0) -> str:
"""将 segments 转为 SRT 字幕格式"""
lines = []
for i, seg in enumerate(segments, 1):
start = seg["start"] + offset_seconds
end = seg["end"] + offset_seconds
def ts(s):
h = int(s // 3600)
m = int((s % 3600) // 60)
sec = s % 60
return f"{h:02d}:{m:02d}:{sec:06.3f}".replace(".", ",")
lines.append(f"{i}\n{ts(start)} --> {ts(end)}\n{seg['text'].strip()}\n")
return "\n".join(lines)
def main():
converter = opencc.OpenCC('t2s')
parser = argparse.ArgumentParser(
description="本地中文语音转写(MLX Whisper,支持 Apple Silicon)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
python transcribe.py lecture.mp3
python transcribe.py interview.mp3 --model large-v3-turbo --format srt
python transcribe.py audio.mp3 --model large-v3 --format json --output result.json
"""
)
parser.add_argument("input", help="输入音频文件路径(支持 mp3/m4a/wav/flac)")
parser.add_argument(
"--model", "-m",
default="large-v3-turbo",
choices=["tiny", "base", "small", "medium", "large-v3", "large-v3-turbo"],
help="模型大小(默认:large-v3-turbo,速度与精度最佳平衡)"
)
parser.add_argument(
"--format", "-f",
default="txt",
choices=["txt", "srt", "json"],
help="输出格式(默认:txt)"
)
parser.add_argument(
"--output", "-o",
default=None,
help="输出文件路径(默认:与输入同目录,自动命名)"
)
parser.add_argument(
"--chunk", "-c",
type=int,
default=25,
help="分段时长(分钟,默认 25 分钟,大文件自动切分)"
)
parser.add_argument(
"--language", "-l",
default="zh",
help="语言代码(默认:zh,普通话)"
)
args = parser.parse_args()
# 检查输入文件
if not os.path.exists(args.input):
print(f"[错误] 文件不存在:{args.input}", file=sys.stderr)
sys.exit(1)
file_size_mb = os.path.getsize(args.input) / (1024 * 1024)
print(f"[信息] 文件大小:{file_size_mb:.1f} MB,模型:{args.model}")
# 确定输出路径
input_stem = Path(args.input).stem
ext_map = {"txt": ".txt", "srt": ".srt", "json": ".json"}
output_path = args.output or str(Path(args.input).parent / f"{input_stem}_transcript{ext_map[args.format]}")
# 导入检查
try:
import mlx_whisper
except ImportError:
print("[错误] 请先安装:pip install mlx-whisper", file=sys.stderr)
sys.exit(1)
# 决定是否切分(文件 > 500MB 或时长可能超 30min 时切分)
need_split = file_size_mb > 300
tmpdir = None
if need_split:
print(f"[信息] 文件较大,自动切分为 {args.chunk} 分钟片段...")
chunks, tmpdir = split_audio(args.input, args.chunk)
print(f"[信息] 共 {len(chunks)} 个片段")
else:
chunks = [args.input]
# 转写
all_segments = []
full_text_parts = []
time_offset = 0.0
try:
for i, chunk_path in enumerate(chunks):
if len(chunks) > 1:
print(f"[转写] 片段 {i+1}/{len(chunks)}:{Path(chunk_path).name}")
else:
print(f"[转写] 正在处理,请稍候...")
result = transcribe_file(chunk_path, args.model, args.language)
# 合并结果(修正时间偏移)
for seg in result.get("segments", []):
adjusted = dict(seg)
adjusted["start"] += time_offset
adjusted["end"] += time_offset
all_segments.append(adjusted)
full_text_parts.append(result.get("text", "").strip())
# 更新时间偏移(用 ffprobe 获取准确时长)
if need_split and i < len(chunks) - 1:
probe = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", chunk_path],
capture_output=True, text=True
)
time_offset += float(probe.stdout.strip() or 0)
# 写入输出
full_text = "\n".join(full_text_parts)
if args.format == "txt":
with open(output_path, "w", encoding="utf-8") as f:
# 每个 segment 一行,更易阅读
for seg in all_segments:
text = converter.convert(seg["text"].strip())
f.write(text.strip() + "\n")
elif args.format == "srt":
for seg in all_segments:
seg["text"] = converter.convert(seg["text"])
with open(output_path, "w", encoding="utf-8") as f:
f.write(format_srt(all_segments))
elif args.format == "json":
full_text = converter.convert(full_text)
for seg in all_segments:
seg["text"] = converter.convert(seg["text"])
with open(output_path, "w", encoding="utf-8") as f:
json.dump({
"text": full_text,
"segments": all_segments,
"language": args.language,
"model": args.model,
}, f, ensure_ascii=False, indent=2)
print(f"[完成] 转写结果已保存至:{output_path}")
print(f"[统计] 共 {len(all_segments)} 个语音段,约 {len(full_text)} 字")
finally:
# 清理临时文件
if tmpdir and os.path.exists(tmpdir):
shutil.rmtree(tmpdir)
if __name__ == "__main__":
main()