[Python] 纯文本查看 复制代码
import io
import hashlib
import os
import sys
import traceback
from contextlib import redirect_stderr, redirect_stdout
from pathlib import Path
try:
from PySide6.QtCore import QObject, Qt, QThread, Signal, Slot
from PySide6.QtWidgets import (
QApplication,
QCheckBox,
QFileDialog,
QHBoxLayout,
QLabel,
QListWidget,
QListWidgetItem,
QMainWindow,
QMessageBox,
QPlainTextEdit,
QPushButton,
QProgressBar,
QVBoxLayout,
QWidget,
QLineEdit,
)
except ImportError as exc: # pragma: no cover - runtime dependency guard
raise SystemExit(
"缺少 PySide6 依赖,请先运行 initialize.bat 或执行 pip install PySide6。"
) from exc
from ColorPdfSpliter import splitPDF
class SignalWriter(io.TextIOBase):
def __init__(self, signal):
self.signal = signal
self._buffer = ""
def write(self, text):
if not text:
return
self._buffer += text
while "\n" in self._buffer:
line, self._buffer = self._buffer.split("\n", 1)
line = line.strip()
if line:
self.signal.emit(line)
def flush(self):
line = self._buffer.strip()
if line:
self.signal.emit(line)
self._buffer = ""
class SplitWorker(QObject):
log = Signal(str)
fileStarted = Signal(str, int, int)
progressChanged = Signal(int, int)
fileFinished = Signal(str, bool, str)
finished = Signal()
error = Signal(str)
def __init__(self, files, duplex=False, use_custom_output=False, output_dir=""):
super().__init__()
self._files = list(files)
self._duplex = duplex
self._use_custom_output = use_custom_output
self._output_dir = output_dir
@Slot()
def run(self):
stdout_writer = SignalWriter(self.log)
stderr_writer = SignalWriter(self.log)
try:
total_files = len(self._files)
for index, file_path in enumerate(self._files, start=1):
source_path = Path(file_path)
export_dir = self._output_dir if self._use_custom_output else str(source_path.parent)
export_dir = str(Path(export_dir)) if export_dir else ""
name_prefix = ""
if self._use_custom_output:
name_prefix = hashlib.sha1(str(source_path).encode("utf-8")).hexdigest()[:8]
if export_dir:
os.makedirs(export_dir, exist_ok=True)
self.fileStarted.emit(str(source_path), index, total_files)
self.log.emit(f"▶ 开始处理:{source_path}")
with redirect_stdout(stdout_writer), redirect_stderr(stderr_writer):
splitPDF(
str(source_path),
self._on_progress,
exportdir=export_dir,
duplex=self._duplex,
name_prefix=name_prefix,
)
self.fileFinished.emit(str(source_path), True, export_dir)
self.log.emit("✅ 所有文件处理完成")
except Exception:
self.error.emit(traceback.format_exc())
finally:
self.finished.emit()
def _on_progress(self, current, total, _title=""):
self.progressChanged.emit(int(current), int(total))
class MainWindow(QMainWindow):
def __init__(self, duplex_default=False):
super().__init__()
self.setWindowTitle("ColorPdfSpliter - PySide")
self.resize(920, 640)
self.worker_thread = None
self.worker = None
central = QWidget(self)
self.setCentralWidget(central)
root_layout = QVBoxLayout(central)
title_label = QLabel("PDF 列表")
title_label.setStyleSheet("font-weight: bold;")
root_layout.addWidget(title_label)
self.file_list = QListWidget()
self.file_list.setSelectionMode(QListWidget.SelectionMode.ExtendedSelection)
root_layout.addWidget(self.file_list, 3)
file_button_row = QHBoxLayout()
self.add_button = QPushButton("添加 PDF")
self.remove_button = QPushButton("移除选中")
self.clear_button = QPushButton("清空列表")
file_button_row.addWidget(self.add_button)
file_button_row.addWidget(self.remove_button)
file_button_row.addWidget(self.clear_button)
file_button_row.addStretch(1)
root_layout.addLayout(file_button_row)
output_title = QLabel("输出设置")
output_title.setStyleSheet("font-weight: bold;")
root_layout.addWidget(output_title)
self.custom_output_checkbox = QCheckBox("输出到指定目录")
self.output_dir_edit = QLineEdit()
self.output_dir_edit.setReadOnly(True)
self.output_dir_edit.setPlaceholderText("与 PDF 同目录")
self.output_dir_edit.setEnabled(False)
self.output_browse_button = QPushButton("选择目录")
self.output_browse_button.setEnabled(False)
output_row = QHBoxLayout()
output_row.addWidget(self.custom_output_checkbox)
output_row.addWidget(self.output_dir_edit, 1)
output_row.addWidget(self.output_browse_button)
root_layout.addLayout(output_row)
options_row = QHBoxLayout()
self.duplex_checkbox = QCheckBox("双面打印")
self.duplex_checkbox.setChecked(duplex_default)
options_row.addWidget(self.duplex_checkbox)
options_row.addStretch(1)
root_layout.addLayout(options_row)
action_row = QHBoxLayout()
self.start_button = QPushButton("开始处理")
self.start_button.setEnabled(False)
self.progress_bar = QProgressBar()
self.progress_bar.setRange(0, 100)
self.progress_bar.setValue(0)
self.progress_bar.setFormat("%p%")
action_row.addWidget(self.start_button)
action_row.addWidget(self.progress_bar, 1)
root_layout.addLayout(action_row)
self.status_label = QLabel("就绪")
root_layout.addWidget(self.status_label)
log_title = QLabel("日志")
log_title.setStyleSheet("font-weight: bold;")
root_layout.addWidget(log_title)
self.log_view = QPlainTextEdit()
self.log_view.setReadOnly(True)
root_layout.addWidget(self.log_view, 2)
self.add_button.clicked.connect(self.add_files)
self.remove_button.clicked.connect(self.remove_selected_files)
self.clear_button.clicked.connect(self.clear_files)
self.custom_output_checkbox.toggled.connect(self.on_output_mode_changed)
self.output_browse_button.clicked.connect(self.choose_output_dir)
self.start_button.clicked.connect(self.start_processing)
self.file_list.itemSelectionChanged.connect(self.update_buttons_state)
def append_log(self, text):
self.log_view.appendPlainText(text)
def update_buttons_state(self):
has_files = self.file_list.count() > 0
has_selection = bool(self.file_list.selectedItems())
self.remove_button.setEnabled(has_selection)
self.clear_button.setEnabled(has_files)
output_ready = (not self.custom_output_checkbox.isChecked()) or bool(self.output_dir_edit.text().strip())
self.start_button.setEnabled(has_files and self.worker_thread is None and output_ready)
def on_output_mode_changed(self, checked):
self.output_browse_button.setEnabled(checked)
self.output_dir_edit.setEnabled(checked)
if checked and not self.output_dir_edit.text().strip():
self.output_dir_edit.setPlaceholderText("请选择输出目录")
if not checked:
self.output_dir_edit.setPlaceholderText("与 PDF 同目录")
self.update_buttons_state()
def choose_output_dir(self):
directory = QFileDialog.getExistingDirectory(self, "选择输出文件夹")
if directory:
self.output_dir_edit.setText(directory)
self.update_buttons_state()
def add_files(self):
files, _ = QFileDialog.getOpenFileNames(
self,
"选择一个或多个 PDF 文件",
"",
"PDF Files (*.pdf);;All Files (*)",
)
if not files:
return
existing = {self.file_list.item(i).data(Qt.ItemDataRole.UserRole) for i in range(self.file_list.count())}
added = 0
for file_path in files:
normalized = str(Path(file_path).resolve())
if normalized in existing:
continue
item = QListWidgetItem(normalized)
item.setData(Qt.ItemDataRole.UserRole, normalized)
self.file_list.addItem(item)
existing.add(normalized)
added += 1
self.append_log(f"已添加 {added} 个 PDF 文件")
self.update_buttons_state()
def remove_selected_files(self):
for item in self.file_list.selectedItems():
row = self.file_list.row(item)
self.file_list.takeItem(row)
self.append_log("已移除选中的文件")
self.update_buttons_state()
def clear_files(self):
self.file_list.clear()
self.append_log("已清空文件列表")
self.update_buttons_state()
def _selected_files(self):
return [self.file_list.item(i).data(Qt.ItemDataRole.UserRole) for i in range(self.file_list.count())]
def start_processing(self):
files = self._selected_files()
if not files:
QMessageBox.warning(self, "提示", "请先添加一个或多个 PDF 文件。")
return
use_custom_output = self.custom_output_checkbox.isChecked()
output_dir = self.output_dir_edit.text().strip() if use_custom_output else ""
if use_custom_output and not output_dir:
QMessageBox.warning(self, "提示", "请先选择输出文件夹。")
return
self.log_view.clear()
self.append_log(f"待处理文件:{len(files)} 个")
self.append_log(f"双面打印:{'是' if self.duplex_checkbox.isChecked() else '否'}")
self.append_log(f"输出模式:{'指定目录 ' + output_dir if use_custom_output else '与源文件同目录'}")
self._set_processing_state(True)
self.worker_thread = QThread(self)
self.worker = SplitWorker(
files,
duplex=self.duplex_checkbox.isChecked(),
use_custom_output=use_custom_output,
output_dir=output_dir or "",
)
self.worker.moveToThread(self.worker_thread)
self.worker_thread.started.connect(self.worker.run)
self.worker.log.connect(self.append_log)
self.worker.fileStarted.connect(self.on_file_started)
self.worker.progressChanged.connect(self.on_progress_changed)
self.worker.fileFinished.connect(self.on_file_finished)
self.worker.error.connect(self.on_worker_error)
self.worker.finished.connect(self.worker_thread.quit)
self.worker.finished.connect(self.worker.deleteLater)
self.worker_thread.finished.connect(self.on_worker_finished)
self.worker_thread.finished.connect(self.worker_thread.deleteLater)
self.worker_thread.start()
def _set_processing_state(self, is_processing):
self.add_button.setEnabled(not is_processing)
self.remove_button.setEnabled(False)
self.clear_button.setEnabled(not is_processing and self.file_list.count() > 0)
self.start_button.setEnabled(False)
self.custom_output_checkbox.setEnabled(not is_processing)
self.output_browse_button.setEnabled(not is_processing and self.custom_output_checkbox.isChecked())
self.duplex_checkbox.setEnabled(not is_processing)
self.file_list.setEnabled(not is_processing)
if is_processing:
self.status_label.setText("处理中...")
else:
self.status_label.setText("就绪")
def on_file_started(self, file_path, index, total):
self.status_label.setText(f"正在处理第 {index}/{total} 个文件:{Path(file_path).name}")
self.progress_bar.setValue(0)
self.progress_bar.setRange(0, 100)
self.append_log(f"文件 {index}/{total}:{file_path}")
def on_progress_changed(self, current, total):
total = max(int(total), 1)
current = max(0, min(int(current), total))
self.progress_bar.setRange(0, total)
self.progress_bar.setValue(current)
self.status_label.setText(f"当前页面进度:{current}/{total}")
def on_file_finished(self, file_path, ok, export_dir):
if ok:
if export_dir:
self.append_log(f"✅ 完成:{file_path} -> {export_dir}")
else:
self.append_log(f"✅ 完成:{file_path}")
else:
self.append_log(f"❌ 失败:{file_path}")
def on_worker_error(self, error_text):
self.append_log("❌ 处理过程中发生错误:")
self.append_log(error_text)
QMessageBox.critical(self, "处理失败", error_text)
def on_worker_finished(self):
self.worker_thread = None
self.worker = None
self._set_processing_state(False)
self.update_buttons_state()
self.progress_bar.setValue(self.progress_bar.maximum())
self.status_label.setText("全部处理完成")
def main():
app = QApplication(sys.argv)
duplex_default = "--duplex" in sys.argv[1:]
window = MainWindow(duplex_default=duplex_default)
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()