[Python] 纯文本查看 复制代码
import tkinter as tk
from tkinter import filedialog, messagebox, ttk, Toplevel, StringVar, Checkbutton, Button
import logging
import pdfplumber
import re
import os
import subprocess
import xlwt
import datetime
pdf_files_folder = None
# 配置日志记录
logging.basicConfig(filename='app.log',
filemode='w',
format='%(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
reverse = False
# 定义排序函数
def sorter(tree, column, data_type, reverse):
l = [(tree.set(k, column), k) for k in tree.get_children('')]
if data_type == 'num':
try:
l = [(float(x), k) for x, k in l]
except ValueError as e:
pass
l.sort(reverse=reverse)
for index, (val, k) in enumerate(l):
tree.move(k, '', index)
def column_sorter(tree, column, data_type='str'):
global reverse
reverse = not reverse
sorter(tree, column, data_type, reverse)
def is_patent_fee_receipt(text):
"""检测是否是专利年费票据(非税收入票据)"""
return ('非税收入' in text or '票据(电子)' in text) and ('专利' in text or '年费' in text)
def extract_patent_fee_data(text):
"""提取专利年费票据数据"""
result = {
"is_patent": False,
"seller_name": "",
"tax_id": "",
"amount": 0.0,
"tax_amount": 0.0,
"total_amount": 0.0,
"invoice_number": "",
"date": None,
"category": "专利年费"
}
if not is_patent_fee_receipt(text):
return result
result["is_patent"] = True
logging.info("检测到专利年费票据")
# 提取票据号码
invoice_match = re.search(r'票据号码[::]\s*(\d+)', text)
if invoice_match:
result["invoice_number"] = invoice_match.group(1)
# 提取收款单位(销售方)- 只取完整单位名,不包括"复核人"等内容
seller_patterns = [
r'(国家知识产权局[^\n]*)', # 国家知识产权局专利局
r'国家知识产权局[^\n]*专利局',
r'(国家[^,,\n]*?专利局)',
]
for pattern in seller_patterns:
match = re.search(pattern, text)
if match:
result["seller_name"] = match.group(1).strip()
break
if not result["seller_name"]:
# 尝试找"专利局"前面包含"国家"的部分
patent_match = re.search(r'(国家[^\n]*?专利局)', text)
if patent_match:
result["seller_name"] = patent_match.group(1).strip()
else:
result["seller_name"] = "国家知识产权局专利局"
# 提取开票日期 - 统一格式化为 YYYY年M月D日
date_match = re.search(r'开票日期[::]\s*(\d{4}-\d{1,2}-\d{1,2})', text)
if date_match:
result["date"] = normalize_date(date_match.group(1))
else:
date_match = re.search(r'开票日期[::]\s*(\d{4}年\d{1,2}月\d{1,2}日)', text)
if date_match:
result["date"] = normalize_date(date_match.group(1))
# 提取项目名称 - 只匹配纯中文部分
project_match = re.search(r'([\u4e00-\u9fa5]+(?:专利第[一二三四五六七八九十\d]+年年费|年费|专利))', text)
if project_match:
result["category"] = project_match.group(1).strip()
else:
# 备选:找包含"专利"或"年费"的中文片段
project_match = re.search(r'([\u4e00-\u9fa5]*专利[\u4e00-\u9fa5]*年费)', text)
if project_match:
result["category"] = project_match.group(1).strip()
else:
project_match = re.search(r'([\u4e00-\u9fa5]*年费[\u4e00-\u9fa5]*)', text)
if project_match:
result["category"] = project_match.group(1).strip()
# 提取金额(小写)
# 优先匹配"金额合计(小写) 300.00"
amount_match = re.search(r'(?:金额合计|合\s*计)[((]小写[))]\s*[¥¥]?\s*(\d+(?:\.\d{1,2})?)', text)
if amount_match:
result["amount"] = float(amount_match.group(1))
result["total_amount"] = result["amount"]
result["tax_amount"] = 0.0 # 专利年费无税
else:
# 备选:找表格中的金额列
# 格式:发明专利第7年年费 0.15 2,000.00 300.00
amount_match = re.search(r'(?:年费|专利|缴费)[^\n]*?(\d+(?:\.\d{1,2})?)\s*$', text, re.MULTILINE)
if amount_match:
result["amount"] = float(amount_match.group(1))
result["total_amount"] = result["amount"]
result["tax_amount"] = 0.0
# 专利年费票据无税号
result["tax_id"] = "非税收入无税号"
logging.info(f"专利年费-提取结果: 销售方={result['seller_name']}, 金额={result['amount']}, 项目={result['category']}")
return result
def is_railway_ticket(text):
"""检测是否是铁路电子客票"""
return '铁路电子客票' in text or ('12306' in text and '铁路' in text)
def extract_railway_ticket_data(text):
"""提取铁路电子客票数据"""
result = {
"is_railway": False,
"seller_name": "",
"tax_id": "",
"buyer_name": "",
"buyer_tax_id": "",
"amount": 0.0,
"tax_amount": 0.0,
"total_amount": 0.0,
"invoice_number": "",
"date": None,
"category": "铁路客运"
}
if not is_railway_ticket(text):
return result
result["is_railway"] = True
logging.info("检测到铁路电子客票")
# 提取发票号码
invoice_match = re.search(r'发票号码[::]\s*(\d+)', text)
if invoice_match:
result["invoice_number"] = invoice_match.group(1)
# 提取开票日期
date_match = re.search(r'开票日期[::]\s*(\d{4}年\d{1,2}月\d{1,2}日)', text)
if date_match:
result["date"] = normalize_date(date_match.group(1))
# 提取票价(只匹配"票价:"后跟的小金额,避免匹配到身份证号)
# 优先匹配 "票价:" 后跟的数字(限制1-4位数和2位小数)
fare_match = re.search(r'票价[::]\s*[¥¥]?\s*(\d{1,4}\.\d{1,2})', text)
if not fare_match:
fare_match = re.search(r'[¥¥]\s*(\d{1,4}\.\d{2})\s*\n', text)
if fare_match:
result["amount"] = float(fare_match.group(1))
result["total_amount"] = result["amount"]
result["tax_amount"] = 0.0 # 铁路客票无税
else:
# 最后兜底:找¥符号后面的小金额(避免身份证号)
fare_match = re.search(r'[¥¥]\s*(\d{1,3}\.\d{2})', text)
if fare_match:
val = float(fare_match.group(1))
if val < 10000: # 票价通常不会超过1万
result["amount"] = val
result["total_amount"] = val
result["tax_amount"] = 0.0
# 提取购买方信息(铁路电子客票包含购买方名称和统一社会信用代码)
buyer_match = re.search(r'购买方名称[::]\s*([^\n]+)', text)
if buyer_match:
buyer_text = buyer_match.group(1).strip()
# 格式: "XXXXXXX股份有限公司 统一社会信用代码:915XXXXXXXXXXXXXX"
parts = re.split(r'\s+统一社会信用代码[::]', buyer_text)
if len(parts) >= 2:
result["buyer_name"] = parts[0].strip()
result["buyer_tax_id"] = parts[1].strip()
else:
result["buyer_name"] = buyer_text
# 销售方:使用国家铁路局名称(省税务局只是开票方)
result["seller_name"] = "国家铁路局"
# 铁路客票的税号使用票面上的统一社会信用代码
if result["buyer_tax_id"]:
result["tax_id"] = result["buyer_tax_id"]
else:
result["tax_id"] = "铁路客票无税号"
# 提取出发站和到达站作为类别说明
stations = re.findall(r'([\u4e00-\u9fa5]+站)', text)
if len(stations) >= 2:
# 排除可能的"12306"等情况
real_stations = [s for s in stations if len(s) >= 3 and '站' in s and '12306' not in s]
if len(real_stations) >= 2:
result["category"] = f"铁路客运-{real_stations[0]}→{real_stations[-1]}"
elif len(real_stations) == 1:
result["category"] = f"铁路客运-{real_stations[0]}"
elif len(stations) == 1:
result["category"] = f"铁路客运-{stations[0]}"
logging.info(f"铁路客票-提取结果: 销售方={result['seller_name']}, 金额={result['amount']}, 类别={result['category']}")
return result
def normalize_date(date_str):
"""统一日期格式,返回 datetime.date 对象(解析失败返回 None)"""
if not date_str:
return None
date_str = str(date_str).strip()
# 匹配 YYYY-MM-DD 格式
m = re.match(r'(\d{4})-(\d{1,2})-(\d{1,2})', date_str)
if m:
try:
return datetime.date(int(m.group(1)), int(m.group(2)), int(m.group(3)))
except ValueError:
return None
# 匹配 YYYY年M月D日 格式(可能带空格和前导零)
m = re.search(r'(\d{4})\s*年\s*(\d{1,2})\s*月\s*(\d{1,2})\s*日', date_str)
if m:
try:
return datetime.date(int(m.group(1)), int(m.group(2)), int(m.group(3)))
except ValueError:
return None
# 匹配 YYYY/M/D 格式
m = re.match(r'(\d{4})/(\d{1,2})/(\d{1,2})', date_str)
if m:
try:
return datetime.date(int(m.group(1)), int(m.group(2)), int(m.group(3)))
except ValueError:
return None
return None
def is_airline_ticket(text):
"""检测是否是航空运输电子客票行程单"""
return '航空运输电子客票行程单' in text or '航空运输' in text
def extract_airline_ticket_data(text):
"""提取航空客票数据"""
result = {
"is_airline": False,
"seller_name": "",
"tax_id": "",
"buyer_name": "",
"buyer_tax_id": "",
"amount": 0.0,
"tax_amount": 0.0,
"total_amount": 0.0,
"invoice_number": "",
"date": None,
"category": "航空客运"
}
if not is_airline_ticket(text):
return result
result["is_airline"] = True
logging.info("检测到航空客票格式")
# 提取发票号码
invoice_match = re.search(r'发票号码[::]\s*(\d+)', text)
if invoice_match:
result["invoice_number"] = invoice_match.group(1)
# 提取填开单位(销售方)
seller_match = re.search(r'填开单位[::][ \t]*([^\n]+)', text)
if seller_match:
seller_name = seller_match.group(1).strip()
# 清理可能混入的日期
seller_name = re.sub(r'\s*填开日期.*', '', seller_name)
result["seller_name"] = seller_name
# 提取填开日期
date_match = re.search(r'填开日期[::]\s*(\d{4}年\d{1,2}月\d{1,2}日)', text)
if date_match:
result["date"] = normalize_date(date_match.group(1))
# 输出所有行用于调试
lines = text.split('\n')
logging.info(f"航空客票-所有行数: {len(lines)}")
# 找到包含CNY和数字的行(数值行,不是表头行)
cny_value_lines = []
for i, line in enumerate(lines):
# 检查是否包含CNY和数字
if 'CNY' in line and re.search(r'CNY\s*\d+', line):
cny_value_lines.append((i, line))
logging.info(f"航空客票-包含CNY数值的行: {cny_value_lines}")
# 从数值行中提取所有CNY后面的数值
all_cny_values = []
for line_idx, line in cny_value_lines:
cny_matches = re.findall(r'CNY\s*(\d+\.?\d*)', line)
for v in cny_matches:
try:
all_cny_values.append(float(v))
except:
pass
logging.info(f"航空客票-所有CNY数值: {all_cny_values}")
# 根据数值位置确定含义
# 通常顺序:票价, 燃油附加费, 增值税税额, 民航发展基金, 合计
fare = 0.0
fuel = 0.0
fund = 0.0
tax = 0.0
total = 0.0
if len(all_cny_values) >= 5:
fare = all_cny_values[0]
fuel = all_cny_values[1]
tax = all_cny_values[2] # 增值税税额始终在第3位
fund = all_cny_values[3]
total = all_cny_values[-1] # 合计始终在最后
elif len(all_cny_values) >= 4:
fare = all_cny_values[0]
fuel = all_cny_values[1]
tax = all_cny_values[2]
total = all_cny_values[-1]
elif len(all_cny_values) >= 3:
fare = all_cny_values[0]
tax = all_cny_values[1] if all_cny_values[1] < all_cny_values[0] else 0
total = all_cny_values[-1]
elif len(all_cny_values) >= 2:
fare = all_cny_values[0]
total = all_cny_values[-1]
elif len(all_cny_values) >= 1:
total = all_cny_values[0]
logging.info(f"航空客票-解析结果: 票价={fare}, 燃油={fuel}, 基金={fund}, 税额={tax}, 合计={total}")
# 提取税率(从CNY行中显式获取)
airline_tax_rate = 0.0
for line in lines:
if 'CNY' in line:
rate_match = re.search(r'(\d+)\.?\d*\s*%', line)
if rate_match:
try:
airline_tax_rate = float(rate_match.group(1))
except:
pass
break
# 设置结果
result["tax_amount"] = tax
result["total_amount"] = total
result["amount"] = total - tax if total > tax else fare
# 提取购买方信息(航空行程单包含购买方名称和统一社会信用代码)
buyer_match = re.search(r'购买方名称[::]\s*([^\n]+)', text)
if buyer_match:
buyer_text = buyer_match.group(1).strip()
# 格式: "XXXXXXX股份有限公司 统一社会信用代码/纳税人识别号:915XXXXXXXXXXXXXX"
parts = re.split(r'\s+统一社会信用代码[//]纳税人识别号[::]', buyer_text)
if len(parts) >= 2:
result["buyer_name"] = parts[0].strip()
result["buyer_tax_id"] = parts[1].strip()
else:
result["buyer_name"] = buyer_text
# 航空客票行程单无税号
result["tax_id"] = "航空客票行程单无税号"
result["tax_rate"] = airline_tax_rate
logging.info(f"航空客票最终结果: 不含税金额={result['amount']}, 税额={result['tax_amount']}, 价税合计={result['total_amount']}, 税率={airline_tax_rate}%, 购买方={result['buyer_name']}")
return result
def is_travel_itinerary(text):
"""检测是否是行程单(非航空运输类的通用行程单)"""
return '行程单' in text and '航空运输电子客票行程单' not in text
def extract_travel_itinerary_data(text):
"""提取行程单数据(通用模板,适用于非航空类的行程单)"""
result = {
"is_itinerary": False,
"seller_name": "",
"tax_id": "",
"amount": 0.0,
"tax_amount": 0.0,
"total_amount": 0.0,
"invoice_number": "",
"date": None,
"category": "行程单"
}
if not is_travel_itinerary(text):
return result
result["is_itinerary"] = True
logging.info("检测到行程单格式")
# 提取发票/行程单号码
invoice_match = re.search(r'(?:发票号码|行程单号|票据号码)[::]\s*(\d+)', text)
if invoice_match:
result["invoice_number"] = invoice_match.group(1)
# 提取填开单位(销售方)
seller_match = re.search(r'(?:填开单位|承运人|出票单位)[::][ \t]*([^\n]+)', text)
if seller_match:
seller_name = seller_match.group(1).strip()
seller_name = re.sub(r'\s*填开日期.*', '', seller_name)
result["seller_name"] = seller_name
# 提取日期
date_match = re.search(r'(?:填开日期|开票日期|日期)[::]\s*(\d{4}年\d{1,2}月\d{1,2}日)', text)
if not date_match:
date_match = re.search(r'(?:填开日期|开票日期|日期)[::]\s*(\d{4}-\d{1,2}-\d{1,2})', text)
if date_match:
result["date"] = normalize_date(date_match.group(1))
lines = text.split('\n')
# 提取所有含CNY的行中的数值
cny_values = []
for line in lines:
if 'CNY' in line:
cny_matches = re.findall(r'CNY\s*(\d+\.?\d*)', line)
for v in cny_matches:
try:
cny_values.append(float(v))
except:
pass
if cny_values:
result["total_amount"] = cny_values[-1]
# 增值税税额在第3位(索引2),合计始终在最后
if len(cny_values) >= 3:
result["tax_amount"] = cny_values[2]
elif len(cny_values) >= 2:
result["tax_amount"] = cny_values[-2] if cny_values[-2] < cny_values[-1] else 0
result["amount"] = result["total_amount"] - result["tax_amount"]
else:
amounts = extract_amounts(text)
result["amount"] = amounts["amount"]
result["tax_amount"] = amounts["tax_amount"]
result["total_amount"] = amounts["total_amount"]
logging.info(f"行程单-提取结果: 销售方={result['seller_name']}, 金额={result['amount']}, 税额={result['tax_amount']}, 合计={result['total_amount']}")
return result
def extract_seller_name(text):
"""提取销售方名称 - 严格区分销售方和购买方,支持跨行合并"""
seller_name = ""
lines = text.split('\n')
# 合并跨行的公司名称
def merge_cross_line_company(company, start_line_idx):
"""检查并合并跨行的公司名称(支持多行)"""
if start_line_idx + 1 >= len(lines):
return company
merged = company
# 排除明显不是公司名的行
exclude_keywords = ['银行', '账号', '地址', '电话', '传真', '开户', '税号',
'纳税人', '开户行', '收款', '复核', '开票', '销售方',
'购买方', '价税', '金额', '备注', '规格', '单位', '数量',
'税率', '税额', '大写', '小写', '合计', '货物', '服务',
'名称', '统一社会信用代码', '代码', '密文', '校验码',
'买售', '买方', '购方', '购买', '地址电话']
# 公司名常见的结尾词(用于判断是否还需要继续合并)
company_endings = ['公司', '站', '店', '厂', '院', '中心', '部', '所', '行',
'有限', '股份', '集团', '油站', '加油站', '服务区']
# 循环检查后续多行(最多检查8行)
for offset in range(1, min(9, len(lines) - start_line_idx)):
next_line = lines[start_line_idx + offset].strip()
# 如果下一行为空,跳过
if not next_line:
continue
# 去掉空格
next_line_no_space = next_line.replace(' ', '')
# 检查是否包含排除关键词
if any(kw in next_line for kw in exclude_keywords):
break
# 检查是否是公司名的续行
# 条件1: 长度较短(≤15个字符) - 续行通常很短
# 条件2: 主要是中文
if len(next_line_no_space) <= 15:
if re.match(r'^[\u4e00-\u9fa5\d()\(\)]+$', next_line_no_space):
# 检查合并后的内容是否仍然像公司名
test_merged = merged + next_line_no_space
# 如果下一行以公司名常见结尾词结尾,说明可能还需要继续
should_continue = any(next_line_no_space.endswith(end) for end in company_endings)
# 如果当前合并结果已经以公司名结尾词结尾,且下一行不是单个字,停止
if any(merged.endswith(end) for end in company_endings) and len(next_line_no_space) > 2:
# 当前已经是完整公司名,下一行可能是其他内容
# 但如果下一行只有1-2个字,可能是补充(如"南站"中的"站")
if len(next_line_no_space) <= 2:
merged = test_merged
continue
break
merged = test_merged
# 如果下一行以公司结尾词结尾,继续检查
if should_continue:
continue
else:
# 否则停止
break
# 如果不符合续行条件,停止检查
break
return merged
# 先识别购买方名称用于排除
buyer_name = ""
buyer_patterns = [
r'(?:购买方|购方|买方)\s*名称[::][ \t]*([^\n销售]+)', # 购买方名称:xxx(遇到"销/售"停止)
r'名称[::][ \t]*([^\n]+?)(?:\s*[销售]|$)', # 名称:xxx 销/售("销"或"售"前面的是购买方)
r'买\s*名称[::][ \t]*([^\n销售]+)', # 买 名称:xxx(缺少"方"字的情形)
]
for pattern in buyer_patterns:
match = re.search(pattern, text)
if match:
buyer_name = match.group(1).strip()
# 清理银行账号等无关信息
buyer_name = re.sub(r'\s*(银行|账号|:|:).*', '', buyer_name)
break
# 方法1: 精确匹配"销/售 名称:"或"销/售名称:"(发票PDF中常见的格式)
pattern1 = re.search(r'[销售]\s*名称[::][ \t]*([^\n]*)', text)
if pattern1:
candidate = pattern1.group(1).strip()
# 截取到分号或换行(排除银行账号)
candidate = re.split(r'[;;\n]', candidate)[0].strip()
# 如果名称后面是空的(如"售 名称:"独占一行),尝试从后续行提取两列布局的名称
if not candidate or len(candidate) < 2:
match_start = pattern1.start()
line_idx = 0
char_count = 0
for i, line in enumerate(lines):
char_count += len(line) + 1
if char_count > match_start:
line_idx = i
break
# 向后搜索最多30行,找包含两个公司名的行(空格分隔的两列布局)
for offset in range(1, min(31, len(lines) - line_idx)):
check_line = lines[line_idx + offset].strip()
# 找包含空格分隔的两个中文名称(至少各含"公司/有限"等关键词)
companies_on_line = re.findall(r'([一-龥()\(\)]{4,}(?:公司|有限|加油站|服务区|酒店|商店))', check_line)
if len(companies_on_line) >= 2:
# 取右边的作为销售方,左边的作为购买方
candidate = companies_on_line[-1]
if not buyer_name:
buyer_name = companies_on_line[0]
logging.info(f'从两列布局行提取销售方: {candidate}, 购买方: {buyer_name}')
# 尝试从下一行提取两列税号(无标签的纯数字行)
if offset + 1 < min(31, len(lines) - line_idx):
tax_line = lines[line_idx + offset + 1].strip()
tax_codes = re.findall(r'([A-Z0-9]{15,20})', tax_line)
if len(tax_codes) >= 2:
# 格式: 购买方税号 销售方税号(空格分隔)
logging.info(f'从两列税号行提取: 购买方={tax_codes[0]}, 销售方={tax_codes[-1]}')
break
# 排除银行相关信息
if '银行' not in candidate and '账号' not in candidate and len(candidate) >= 2:
# 找到这一行在lines中的索引
match_start = pattern1.start()
line_idx = 0
char_count = 0
for i, line in enumerate(lines):
char_count += len(line) + 1
if char_count > match_start:
line_idx = i
break
# 检查是否需要跨行合并(两列布局的情况)
# 检查下一行是否是两列布局(包含"买 售"等特征)
if line_idx + 1 < len(lines):
next_line = lines[line_idx + 1].strip()
# 如果下一行是两列布局格式(如"买 售 加油站")
# 尝试提取右列内容
if '买' in next_line and '售' in next_line:
# 两列布局,提取"售"后面的内容
right_col_match = re.search(r'售\s*([^\s]+(?:\s+[^\s]+)*?)$', next_line)
if right_col_match:
right_content = right_col_match.group(1).strip()
# 检查是否是公司名的续行
if right_content and len(right_content) <= 15:
right_content_no_space = right_content.replace(' ', '')
if re.match(r'^[\u4e00-\u9fa5\d()\(\)]+$', right_content_no_space):
candidate = candidate + right_content_no_space
seller_name = candidate
# 方法2: 匹配"销售方名称:"或"销方名称:"或"售方名称:"
if not seller_name:
pattern2 = re.search(r'(?:销售方|销方|售方)\s*名称[::][ \t]*([^\n]*)', text)
if pattern2:
candidate = pattern2.group(1).strip()
candidate = re.split(r'[;;\n]', candidate)[0].strip()
if '银行' not in candidate and '账号' not in candidate:
# 找行索引并合并跨行
match_start = pattern2.start()
line_idx = 0
char_count = 0
for i, line in enumerate(lines):
char_count += len(line) + 1
if char_count > match_start:
line_idx = i
break
seller_name = merge_cross_line_company(candidate, line_idx)
# 方法3: 找"名称:购买方 xxx 销 名称:销售方 xxx"这种格式
if not seller_name:
# 找所有"名称:"的位置
name_matches = list(re.finditer(r'名称[::][ \t]*([^\n]+)', text))
for i, match in enumerate(name_matches):
full_match_text = match.group(0) # 完整匹配包含"名称:"
matched_value = match.group(1).strip()
# 如果匹配到的行包含"销"或"售",说明是销售方区域
if ('销' in full_match_text or '售' in full_match_text) and '购买' not in full_match_text:
# 检查是否在"销/售"之后
seller_marker = '销' if '销' in full_match_text else '售'
pos_in_match = full_match_text.find(seller_marker)
if pos_in_match < full_match_text.find(matched_value[:10] if len(matched_value) > 10 else matched_value):
# "销/售"在名称值之前,这是销售方标记行
candidate = matched_value
else:
# "销/售"在名称值里面,需要提取"销/售 名称:xxx"中的xxx
split_parts = re.split(r'\s*[销售]\s*名称[::]\s*', matched_value)
if len(split_parts) > 1:
candidate = split_parts[-1].strip()
else:
# 名称:xxx 销/售 名称:yyy 格式,取最后一部分
after_seller_marker = re.search(r'[销售]\s*名称[::]\s*(.+)', full_match_text)
if after_seller_marker:
candidate = after_seller_marker.group(1).strip()
else:
candidate = matched_value
# 清理
candidate = re.split(r'[;;\n]', candidate)[0].strip()
candidate = re.sub(r'\s*(银行|账号|:|:).*', '', candidate)
# 验证是公司名
if re.search(r'(公司|有限|加油站|石油|服务区|能源|贸易|石化|油站|股份|超市|商店)', candidate):
if '银行' not in candidate and '账号' not in candidate:
# 找行索引并合并跨行
match_start = match.start()
line_idx = 0
char_count = 0
for j, line in enumerate(lines):
char_count += len(line) + 1
if char_count > match_start:
line_idx = j
break
seller_name = merge_cross_line_company(candidate, line_idx)
break
# 方法4: 根据文本位置判断(销售方在购买方后面)
if not seller_name:
# 找"购买方"和"销售方/销方"的位置
buyer_pos = max(
text.find('购买方'),
text.find('购方'),
text.find('买方')
)
seller_pos = max(
text.find('销售方'),
text.find('销方'),
text.find('售方'),
text.find('\n销'), # 单独的"销"字
text.find('\n售') # 单独的"售"字
)
if seller_pos > buyer_pos and buyer_pos >= 0:
# 销售方在购买方后面,从销售方位置开始提取
text_after_seller = text[seller_pos:]
# 找第一个公司名
company_match = re.search(r'([\u4e00-\u9fa5]{2,}(?:加油站|石油|服务区|能源|贸易|石化|油站|股份|有限|公司|超市|商店)[^\n;;]*)', text_after_seller)
if company_match:
candidate = company_match.group(1).strip()
candidate = re.sub(r'\s*(银行|账号).*', '', candidate)
if '银行' not in candidate and '账号' not in candidate:
# 计算行索引
match_start = seller_pos + company_match.start()
line_idx = 0
char_count = 0
for j, line in enumerate(lines):
char_count += len(line) + 1
if char_count > match_start:
line_idx = j
break
seller_name = merge_cross_line_company(candidate, line_idx)
# 方法5: 从发票下半部分找公司名
if not seller_name:
half_text = text[len(text)//2:]
companies = re.findall(r'([\u4e00-\u9fa5]{2,}(?:加油站|石油|服务区|能源|贸易|石化|油站|股份|有限|公司|超市|商店)[^\n;;]*)', half_text)
valid_companies = []
for company in companies:
company = company.strip()
company = re.sub(r'\s*(银行|账号).*', '', company)
# 排除购买方
if buyer_name and buyer_name in company:
continue
# 排除银行信息
if '银行' in company or '账号' in company:
continue
# 排除购买方关键词
if '购买' in company or '购方' in company or '买方' in company:
continue
if company:
valid_companies.append(company)
if valid_companies:
best_company = max(valid_companies, key=len)
# 尝试找行索引并合并跨行
company_pos = text.find(best_company)
if company_pos >= 0:
line_idx = 0
char_count = 0
for j, line in enumerate(lines):
char_count += len(line) + 1
if char_count > company_pos:
line_idx = j
break
seller_name = merge_cross_line_company(best_company, line_idx)
else:
seller_name = best_company
# 清理名称
if seller_name:
# 去除多余空格
seller_name = re.sub(r'\s+', '', seller_name) # 完全去掉空格(公司名不应该有空格)
# 去除"买售"等两列布局残留
seller_name = re.sub(r'买售.*$', '', seller_name)
seller_name = re.sub(r'方方.*$', '', seller_name)
# 去除前缀
seller_name = re.sub(r'^名称[::]', '', seller_name)
seller_name = re.sub(r'^销售方', '', seller_name)
seller_name = re.sub(r'^销方', '', seller_name)
seller_name = re.sub(r'^销\s*', '', seller_name)
seller_name = re.sub(r'^\d+\s*', '', seller_name)
# 去除尾部数字和标点
seller_name = re.sub(r'[\d\s\-_;;,,]+$', '', seller_name)
# 清理非法字符
seller_name = re.sub(r'[<>:"/\\|?*]', '', seller_name)
# 限制长度
if len(seller_name) > 80:
seller_name = seller_name[:80]
return {"seller": seller_name if seller_name else "未识别", "buyer": buyer_name}
def extract_seller_tax_id(text):
"""提取销售方和购买方纳税人识别号,返回 dict"""
tax_id = ""
buyer_tax_id = ""
lines = text.split('\n')
# 方法1: 找包含两个纳税人识别号的行(两列布局)
# 格式如:信 统一社会信用代码/纳税人识别号:91430104MA4QNT790T 信 统一社会信用代码/纳税人识别号:91430726743166538K
for line in lines:
# 检查是否包含两个税号(两列布局)
if '统一社会信用代码' in line or '纳税人识别号' in line:
# 提取所有税号(15-20位字母数字)
tax_ids = re.findall(r'([A-Z0-9]{15,20})', line)
if len(tax_ids) >= 2:
# 两列布局,取第二个(右列=销售方),第一个为购买方
tax_id = tax_ids[-1]
buyer_tax_id = tax_ids[0]
logging.info(f'两列布局税号,购买方: {buyer_tax_id}, 销售方: {tax_id}')
break
elif len(tax_ids) == 1:
# 只有一个税号,检查是否在销售方区域
# 查找这一行前后是否有"销"标记
line_idx = lines.index(line)
# 检查前面几行是否有销售方标记
for i in range(max(0, line_idx-5), line_idx):
if ('销' in lines[i] or '售' in lines[i]) and '购' not in lines[i]:
tax_id = tax_ids[0]
break
if tax_id:
break
# 方法2: 查找"销/售"后面的纳税人识别号
if not tax_id:
# 找销售方名称所在的区域
seller_match = re.search(r'[销售]\s*名称[::][ \t]*([^\n]*)', text)
if seller_match:
seller_pos = seller_match.end()
# 在销售方名称后面找税号
text_after_seller = text[seller_pos:seller_pos+500]
tax_match = re.search(r'(?:统一社会信用代码|纳税人识别号)[::/]*\s*([A-Z0-9]{15,20})', text_after_seller)
if tax_match:
tax_id = tax_match.group(1)
# 方法3: 找所有税号,第一个为购买方,最后一个为销售方
if not tax_id:
all_tax_ids = re.findall(r'(?:统一社会信用代码|纳税人识别号)[::/]*\s*([A-Z0-9]{15,20})', text)
if len(all_tax_ids) >= 2:
# 取最后一个(通常是销售方的),第一个为购买方
tax_id = all_tax_ids[-1]
if not buyer_tax_id:
buyer_tax_id = all_tax_ids[0]
elif len(all_tax_ids) == 1:
tax_id = all_tax_ids[0]
# 方法4: 直接提取所有18位统一社会信用代码(排除发票号码等长数字中的片段)
if not tax_id:
credit_codes = re.findall(r'(?<!\d)([A-Z0-9]{18})(?!\d)', text)
if credit_codes:
# 如果有多个,取最后一个(销售方),第一个为购买方
tax_id = credit_codes[-1]
if len(credit_codes) >= 2 and not buyer_tax_id:
buyer_tax_id = credit_codes[0]
return {"seller": tax_id if tax_id else "未识别", "buyer": buyer_tax_id}
def extract_amounts(text):
"""提取金额、税额、价税合计、税率 - 针对发票PDF格式优化"""
amount = 0.0 # 金额(不含税)
tax_amount = 0.0 # 税额
total_amount = 0.0 # 价税合计
tax_rates = [] # 税率列表(多行取平均)
lines = text.split('\n')
# 方法1: 优先查找"合 计"行(带空格的合计)- 这是最准确的汇总数据
for i, line in enumerate(lines):
line_stripped = line.strip()
# 匹配 "合 计 ¥xxx ¥xxx" 或 "合 计 ¥xxx ¥xxx"
if re.match(r'^合\s*计', line_stripped) or '合\xa5' in line or '合计' in line_stripped:
# 提取所有金额数字(包括¥符号后面的)
numbers = re.findall(r'[¥¥]?\s*(\d+(?:\.\d{1,2})?)', line)
# 如果本行没数字,尝试上一行
if not numbers and i > 0:
numbers = re.findall(r'[¥¥]?\s*(\d+(?:\.\d{1,2})?)', lines[i-1])
# 如果本行没数字,尝试下一行
if not numbers and i + 1 < len(lines):
numbers = re.findall(r'[¥¥]?\s*(\d+(?:\.\d{1,2})?)', lines[i+1])
valid_nums = []
for num in numbers:
try:
val = float(num)
if val > 1: # 排除税率等小数字
valid_nums.append(val)
except:
pass
if len(valid_nums) >= 2:
# 合计行通常格式:合计 金额 税额
# 取最大的作为金额,次大的作为税额
valid_nums.sort(reverse=True)
amount = valid_nums[0]
tax_amount = valid_nums[1] if len(valid_nums) > 1 else 0
logging.info(f'从合计行提取: 金额={amount}, 税额={tax_amount}')
break
elif len(valid_nums) == 1:
amount = valid_nums[0]
logging.info(f'从合计行提取金额: {amount}')
break
# 方法2: 查找价税合计
total_patterns = [
r'(?:价税合计|价 税 合 计)[^\d¥¥]*[¥¥]?\s*(\d+(?:\.\d{1,2})?)',
r'价税合计[((]小写[))][^\d¥¥]*[¥¥]?\s*(\d+(?:\.\d{1,2})?)',
r'小写[^\d¥¥]*[¥¥]?\s*(\d+(?:\.\d{1,2})?)',
r'价税合计.*?[¥¥]?\s*(\d+(?:\.\d{1,2})?)',
]
for pattern in total_patterns:
matches = re.findall(pattern, text)
if matches:
for match in reversed(matches):
val = float(match)
if val > 0:
total_amount = val
break
if total_amount > 0:
break
# 方法3: 如果合计行没找到,从货物明细行累加
if amount == 0 or tax_amount == 0:
# 查找所有包含"*"的货物行
goods_amounts = [] # 存储每行的金额
goods_taxes = [] # 存储每行的税额
for i, line in enumerate(lines):
if '*' in line and re.search(r'\d+\.\d{2}', line):
# 提取税率(如 13%、9%、6% 等)
rate_match = re.search(r'(\d+)\.?\d*\s*%', line)
if rate_match:
try:
tax_rates.append(float(rate_match.group(1)))
except:
pass
# 提取所有两位小数的数字(含可能的负号)
numbers = re.findall(r'(-?\d+\.\d{2})', line)
if len(numbers) >= 2:
# 通常最后两个是金额和税额
# 但需要判断哪个是金额,哪个是税额
# 税额通常比金额小(除非税率>100%,不可能)
nums = [float(n) for n in numbers]
# 过滤掉单价和极小数字(绝对值>0.01,含负数扣减)
valid_nums = [n for n in nums if abs(n) > 0.01]
if len(valid_nums) >= 2:
# 取最后两个作为金额和税额
goods_amounts.append(valid_nums[-2])
goods_taxes.append(valid_nums[-1])
elif len(valid_nums) == 1:
goods_amounts.append(valid_nums[0])
# 累加所有货物行的金额和税额
if goods_amounts:
if amount == 0:
amount = sum(goods_amounts)
if tax_amount == 0 and goods_taxes:
tax_amount = sum(goods_taxes)
logging.info(f'从货物行累加: 金额={amount}, 税额={tax_amount}, 税率列表={tax_rates}')
# 方法4: 查找"金额"和"税额"关键词
if amount == 0:
amount_match = re.search(r'(?:金额|金 额)[^\d]*(\d+(?:\.\d{1,2})?)', text)
if amount_match:
val = float(amount_match.group(1))
if val > 0:
amount = val
if tax_amount == 0:
tax_match = re.search(r'(?:税额|税 额)[^\d]*(\d+(?:\.\d{1,2})?)', text)
if tax_match:
val = float(tax_match.group(1))
if val > 0:
tax_amount = val
# 方法5: 如果只有价税合计,尝试推算金额和税额
if total_amount > 0 and amount == 0 and tax_amount == 0:
# 常见税率:3%, 6%, 9%, 13%
for rate in [0.13, 0.09, 0.06, 0.03]:
calculated_amount = total_amount / (1 + rate)
calculated_tax = total_amount - calculated_amount
# 检查是否合理(税额应该是整数或.00结尾)
if abs(calculated_tax - round(calculated_tax, 2)) < 0.01:
amount = round(calculated_amount, 2)
tax_amount = round(calculated_tax, 2)
break
# 方法6: 验证和修正
if amount > 0 and tax_amount > 0:
# 计算价税合计
calculated_total = round(amount + tax_amount, 2)
if total_amount == 0:
total_amount = calculated_total
elif abs(calculated_total - total_amount) > 0.1:
# 差异太大,检查是否金额和税额反了
swapped_total = round(tax_amount + amount, 2)
if abs(swapped_total - total_amount) < 0.1:
# 已经是正确的,可能是精度问题
pass
else:
logging.warning(f'金额校验异常: 金额{amount} + 税额{tax_amount} = {calculated_total}, 价税合计{total_amount}')
elif amount > 0 and tax_amount == 0 and total_amount > 0:
# 有金额和价税合计,计算税额
tax_amount = round(total_amount - amount, 2)
elif amount == 0 and tax_amount > 0 and total_amount > 0:
# 有税额和价税合计,计算金额
amount = round(total_amount - tax_amount, 2)
# 方法7: 最后兜底 - 如果价税合计为0但有其他金额
if total_amount == 0 and amount > 0:
total_amount = amount + tax_amount
# 计算税率:多行取平均值,否则通过金额/税额反推
if tax_rates:
tax_rate = sum(tax_rates) / len(tax_rates)
elif amount > 0 and tax_amount > 0:
tax_rate = round(tax_amount / amount * 100, 2)
elif total_amount > 0 and amount > 0 and amount < total_amount:
tax_rate = round((total_amount - amount) / amount * 100, 2)
else:
tax_rate = 0.0
# 保留两位小数
amount = round(amount, 2)
tax_amount = round(tax_amount, 2)
total_amount = round(total_amount, 2)
tax_rate = round(tax_rate, 2)
logging.info(f'金额提取: 金额={amount}, 税额={tax_amount}, 价税合计={total_amount}, 税率={tax_rate}%')
return {
"amount": amount,
"tax_amount": tax_amount,
"total_amount": total_amount,
"tax_rate": tax_rate
}
def read_pdf_content(file_path):
logging.info(f'开始处理文件: {os.path.basename(file_path)}')
try:
with pdfplumber.open(file_path) as pdf:
full_text = ""
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
full_text += page_text + "\n"
# 记录文本长度,无文字则为扫描版PDF
logging.info(f'提取文本长度: {len(full_text)}')
if len(full_text.strip()) == 0:
logging.warning(f'扫描版PDF(无文字信息),无法提取数据: {os.path.basename(file_path)}')
# 优先检测专利年费票据
patent_data = extract_patent_fee_data(full_text)
if patent_data["is_patent"]:
return {
"amount": patent_data["amount"],
"tax_amount": patent_data["tax_amount"],
"total_amount": patent_data["total_amount"],
"tax_rate": 0.0,
"invoice_number": patent_data["invoice_number"],
"name": patent_data["seller_name"],
"tax_id": patent_data["tax_id"],
"buyer_name": "",
"buyer_tax_id": "",
"date": normalize_date(patent_data["date"]),
"category": patent_data["category"],
"project_name": ""
}
# 优先检测铁路电子客票
railway_data = extract_railway_ticket_data(full_text)
if railway_data["is_railway"]:
return {
"amount": railway_data["amount"],
"tax_amount": railway_data["tax_amount"],
"total_amount": railway_data["total_amount"],
"tax_rate": 0.0,
"invoice_number": railway_data["invoice_number"],
"name": railway_data["seller_name"],
"tax_id": railway_data["tax_id"],
"buyer_name": railway_data.get("buyer_name", ""),
"buyer_tax_id": railway_data.get("buyer_tax_id", ""),
"date": normalize_date(railway_data["date"]),
"category": railway_data["category"],
"project_name": ""
}
# 优先检测航空客票格式
airline_data = extract_airline_ticket_data(full_text)
if airline_data["is_airline"]:
return {
"amount": airline_data["amount"],
"tax_amount": airline_data["tax_amount"],
"total_amount": airline_data["total_amount"],
"tax_rate": airline_data.get("tax_rate", 0.0),
"invoice_number": airline_data["invoice_number"],
"name": airline_data["seller_name"],
"tax_id": airline_data["tax_id"],
"buyer_name": airline_data.get("buyer_name", ""),
"buyer_tax_id": airline_data.get("buyer_tax_id", ""),
"date": normalize_date(airline_data["date"]),
"category": airline_data["category"],
"project_name": ""
}
# 检测行程单(非航空类的通用行程单)
itinerary_data = extract_travel_itinerary_data(full_text)
if itinerary_data["is_itinerary"]:
itin_tax_rate = round(itinerary_data["tax_amount"] / itinerary_data["amount"] * 100, 2) if itinerary_data["amount"] > 0 and itinerary_data["tax_amount"] > 0 else 0.0
return {
"amount": itinerary_data["amount"],
"tax_amount": itinerary_data["tax_amount"],
"total_amount": itinerary_data["total_amount"],
"tax_rate": itin_tax_rate,
"invoice_number": itinerary_data["invoice_number"],
"name": itinerary_data["seller_name"],
"tax_id": itinerary_data["tax_id"] if itinerary_data["tax_id"] else "行程单无税号",
"buyer_name": "",
"buyer_tax_id": "",
"date": normalize_date(itinerary_data["date"]),
"category": itinerary_data["category"],
"project_name": ""
}
# 调试:输出关键区域文本
lines = full_text.split('\n')
for i, line in enumerate(lines):
if '销' in line and '名称' in line:
start_idx = max(0, i - 2)
end_idx = min(len(lines), i + 8)
context = '\n'.join([f"L{j}: {lines[j]}" for j in range(start_idx, end_idx)])
logging.info(f'销售方区域文本:\n{context}')
break
# 提取发票号码
invoice_number = re.findall(r'(?:发票号码|发票代码|发票号)[\s]*[::]*\s*([0-9]{8,20})', full_text)
if not invoice_number:
invoice_number = re.findall(r'([0-9]{20})', full_text)
# 提取销售方和购买方名称
seller_info = extract_seller_name(full_text)
seller_name = seller_info["seller"]
buyer_name = seller_info["buyer"]
# 提取销售方和购买方纳税人识别号
tax_info = extract_seller_tax_id(full_text)
seller_tax_id = tax_info["seller"]
buyer_tax_id = tax_info["buyer"]
# 提取日期
date = re.findall(r'(\d{4}\s*年\s*\d{1,2}\s*月\s*\d{1,2}\s*日)', full_text)
if not date:
date = re.findall(r'(\d{4}-\d{1,2}-\d{1,2})', full_text)
if not date:
date = re.findall(r'(\d{4}年\d{1,2}月\d{1,2}日)', full_text)
if not date:
date = re.findall(r'(\d{4}/\d{1,2}/\d{1,2})', full_text)
# 提取类别(货物名称)和项目名称
# 格式通常是 *分类*项目名称,例如 *汽油*汽95# 或 *供电*电费
# 第一个*是分类(作为类别),第二个*之后是项目名称(截取到第一个空格/换行前,支持跨行)
category = ""
project_name = ""
# 优先匹配 *分类*项目名称 模式
goods_pair = re.search(r'\*([^*]+)\*', full_text)
if goods_pair:
category = goods_pair.group(1).strip()
# 从第二个*后开始:跳过开头空白/换行,截取到下一个空白/换行/下一个*/行尾前
rest = full_text[goods_pair.end():].lstrip()
end_match = re.search(r'[\s\n\r*]', rest)
if end_match:
project_name = rest[:end_match.start()].strip()
else:
project_name = rest.strip()
logging.info(f'提取项目(成对匹配): 分类={category}, 项目={project_name}')
if not category:
# 退而求其次:只匹配 *xxx* 形式
goods_matches = re.findall(r'\*([^*]+)\*', full_text)
if goods_matches:
category = goods_matches[0].strip()
project_name = category
logging.info(f'提取项目(单匹配): 分类={category}')
if not category:
# 尝试找"货物或应税劳务、服务名称"后的内容
service_match = re.search(r'货物或应税劳务、服务名称[^\n]*\n([^\n]+)', full_text)
if service_match:
category = service_match.group(1).strip()
project_name = category
# 提取金额、税额、价税合计
amounts = extract_amounts(full_text)
logging.info(f'成功提取: 金额={amounts["amount"]}, 税额={amounts["tax_amount"]}, 价税合计={amounts["total_amount"]}, 税率={amounts["tax_rate"]}%, 发票号码={invoice_number[0] if invoice_number else "未找到"}, 销售方={seller_name}, 销售方税号={seller_tax_id}, 购买方={buyer_name}')
except Exception as e:
logging.error(f"读取PDF文件失败 {file_path}: {e}")
return {
"amount": 0.0,
"tax_amount": 0.0,
"total_amount": 0.0,
"tax_rate": 0.0,
"invoice_number": "",
"name": "读取失败",
"tax_id": "",
"buyer_name": "",
"buyer_tax_id": "",
"date": None,
"category": "",
"project_name": ""
}
# 使用价税合计作为主要金额(如果金额为0但有价税合计)
final_amount = amounts["amount"]
final_total = amounts["total_amount"]
# 如果金额为0但价税合计不为0,将金额设置为价税合计
if final_amount == 0 and final_total > 0:
final_amount = final_total
return {
"amount": final_amount,
"tax_amount": amounts["tax_amount"],
"total_amount": amounts["total_amount"],
"tax_rate": amounts["tax_rate"],
"invoice_number": invoice_number[0] if invoice_number else "",
"name": seller_name,
"tax_id": seller_tax_id,
"buyer_name": buyer_name,
"buyer_tax_id": buyer_tax_id,
"date": normalize_date(date[0]) if date else None,
"category": category,
"project_name": project_name
}
def get_pdf_files(pdf_dir):
logging.info('开始扫描PDF文件')
pdf_files = []
for root, dirs, files in os.walk(pdf_dir):
for file in files:
if file.lower().endswith(".pdf"):
filepath = os.path.normpath(os.path.join(root, file))
pdf_files.append(filepath)
logging.info(f'找到 {len(pdf_files)} 个PDF文件')
return pdf_files
def rename_pdf_file(file_path, new_value):
logging.info(f'重命名文件: {file_path}')
dir_path = os.path.dirname(file_path)
# 清理文件名中的非法字符
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
new_value = new_value.replace(char, '_')
# 限制文件名长度
if len(new_value) > 200:
new_value = new_value[:200]
new_file_name = f"{new_value}.pdf"
new_file_path = os.path.join(dir_path, new_file_name)
# 如果文件已存在,添加数字后缀
counter = 1
while os.path.exists(new_file_path):
new_file_name = f"{new_value}_{counter}.pdf"
new_file_path = os.path.join(dir_path, new_file_name)
counter += 1
os.rename(file_path, new_file_path)
logging.info(f'重命名完成: {new_file_path}')
return new_file_path
def open_pdf(path):
if os.name == 'nt': # For Windows
os.startfile(path)
else: # For MacOS and Linux
opener = 'open' if os.name == 'posix' else 'xdg-open'
subprocess.call([opener, path])
def center_window(root, width=600, height=250):
screen_width = root.winfo_screenwidth()
screen_height = root.winfo_screenheight()
x = (screen_width - width) // 2
y = (screen_height - height) // 2
root.geometry(f"{width}x{height}+{x}+{y}")
root.minsize(width, height)
def display_results(values, total_amount_sum, total_tax_sum, total_sum, input_root, pdf_source_folder):
# 输入界面的根窗口销毁
input_root.destroy()
# 保留原始 values(含 date 对象)供 Excel 导出使用真实日期类型
raw_values = list(values)
# 创建新的根窗口
root = tk.Tk()
root.title("发票金额统计结果")
center_window(root, width=1700, height=700)
root.minsize(1200, 600)
# 主框架设置
main_frame = ttk.Frame(root, padding="10")
main_frame.grid(column=0, row=0, sticky=(tk.W, tk.E, tk.N, tk.S))
main_frame.columnconfigure(0, weight=1)
main_frame.rowconfigure(0, weight=1)
root.columnconfigure(0, weight=1)
root.rowconfigure(0, weight=1)
# 定义重命名选中文件的功能
def rename_selected_files():
selected_items = tree.selection()
if not selected_items:
messagebox.showerror("错误", "请先选择一个或多个PDF文件进行重命名")
return
# 创建对话框
dialog = Toplevel(root)
dialog.title("选择需要的字段")
dialog.geometry("350x500")
dialog.transient(root)
dialog.grab_set()
include_amount = StringVar(value='no')
include_tax = StringVar(value='no')
include_total = StringVar(value='yes')
include_project_name = StringVar(value='no')
include_invoice_number = StringVar(value='yes')
include_name = StringVar(value='yes')
include_tax_id = StringVar(value='no')
include_buyer_name = StringVar(value='no')
include_buyer_tax_id = StringVar(value='no')
include_date = StringVar(value='no')
frame = ttk.Frame(dialog, padding="20")
frame.pack(fill='both', expand=True)
ttk.Label(frame, text="请选择要包含在文件名中的字段:", font=('Arial', 10, 'bold')).pack(pady=(0, 10))
Checkbutton(frame, text='金额(不含税)', variable=include_amount, onvalue='yes', offvalue='no', anchor='w').pack(fill='x', pady=3)
Checkbutton(frame, text='税额', variable=include_tax, onvalue='yes', offvalue='no', anchor='w').pack(fill='x', pady=3)
Checkbutton(frame, text='价税合计', variable=include_total, onvalue='yes', offvalue='no', anchor='w').pack(fill='x', pady=3)
Checkbutton(frame, text='项目名称', variable=include_project_name, onvalue='yes', offvalue='no', anchor='w').pack(fill='x', pady=3)
Checkbutton(frame, text='发票号码', variable=include_invoice_number, onvalue='yes', offvalue='no', anchor='w').pack(fill='x', pady=3)
Checkbutton(frame, text='销售方名称', variable=include_name, onvalue='yes', offvalue='no', anchor='w').pack(fill='x', pady=3)
Checkbutton(frame, text='销售方纳税人识别号', variable=include_tax_id, onvalue='yes', offvalue='no', anchor='w').pack(fill='x', pady=3)
Checkbutton(frame, text='购买方名称', variable=include_buyer_name, onvalue='yes', offvalue='no', anchor='w').pack(fill='x', pady=3)
Checkbutton(frame, text='购买方纳税人识别号', variable=include_buyer_tax_id, onvalue='yes', offvalue='no', anchor='w').pack(fill='x', pady=3)
Checkbutton(frame, text='开票日期', variable=include_date, onvalue='yes', offvalue='no', anchor='w').pack(fill='x', pady=3)
def on_ok():
renamed_count = 0
for item in selected_items:
item_values = tree.item(item, 'values')
new_name_parts = []
if include_amount.get() == 'yes' and item_values[1]:
new_name_parts.append(f"金额{item_values[1]}")
if include_tax.get() == 'yes' and item_values[3]:
new_name_parts.append(f"税额{item_values[3]}")
if include_total.get() == 'yes' and item_values[4]:
new_name_parts.append(f"{item_values[4]}")
if include_project_name.get() == 'yes' and item_values[10]:
new_name_parts.append(str(item_values[10]))
if include_invoice_number.get() == 'yes' and item_values[5]:
new_name_parts.append(str(item_values[5]))
if include_name.get() == 'yes' and item_values[6] and item_values[6] != "未识别":
new_name_parts.append(str(item_values[6]))
if include_tax_id.get() == 'yes' and item_values[7] and item_values[7] != "未识别":
new_name_parts.append(str(item_values[7]))
if include_buyer_name.get() == 'yes' and item_values[8] and item_values[8]:
new_name_parts.append(str(item_values[8]))
if include_buyer_tax_id.get() == 'yes' and item_values[9] and item_values[9]:
new_name_parts.append(str(item_values[9]))
if include_date.get() == 'yes' and item_values[11]:
new_name_parts.append(str(item_values[11]))
if not new_name_parts:
continue
new_file_name = "_".join(new_name_parts)
current_file_path = item_values[12]
new_file_path = rename_pdf_file(current_file_path, new_file_name)
tree.set(item, column="文件路径", value=new_file_path)
renamed_count += 1
messagebox.showinfo("完成", f"成功重命名 {renamed_count} 个文件。")
dialog.destroy()
Button(frame, text='确定', command=on_ok, width=15).pack(pady=20)
def export_to_xls():
# 让用户选择保存路径
save_path = filedialog.asksaveasfilename(
title="保存Excel文件",
defaultextension=".xls",
filetypes=[("Excel files", "*.xls"), ("All files", "*.*")],
initialfile="发票数据.xls",
initialdir=pdf_source_folder
)
if not save_path:
return
try:
workbook = xlwt.Workbook(encoding='utf-8')
sheet = workbook.add_sheet('发票数据')
# 设置列宽
col_widths = [8, 15, 10, 15, 18, 25, 30, 25, 30, 25, 20, 15, 50]
for i, width in enumerate(col_widths):
sheet.col(i).width = 256 * width
style = xlwt.easyxf('align: vert centre, horiz centre')
style_text = xlwt.easyxf('align: vert centre, horiz left')
# 非标准发票样式(无税号的发票,如航空/铁路/专利)
style_special = xlwt.easyxf('align: vert centre, horiz centre; pattern: pattern solid, fore_colour light_yellow')
style_special_text = xlwt.easyxf('align: vert centre, horiz left; pattern: pattern solid, fore_colour light_yellow')
# 日期列样式(Excel 真正的日期类型,单元格可按日期排序/筛选)
style_date = xlwt.easyxf('align: vert centre, horiz centre', num_format_str='yyyy-mm-dd')
style_date_special = xlwt.easyxf('align: vert centre, horiz centre; pattern: pattern solid, fore_colour light_yellow', num_format_str='yyyy-mm-dd')
headers = ["序号", "金额(不含税)", "税率", "税额", "价税合计", "发票号码", "销售方名称", "销售方税号", "购买方名称", "购买方税号", "项目名称", "开票日期", "文件路径"]
for i, header in enumerate(headers):
sheet.write(0, i, header, style)
for i, item in enumerate(tree.get_children(), start=1):
row_values = tree.item(item, 'values')
# 判断是否是非标准发票(无税号的发票)
is_special = 'special' in tree.item(item, 'tags')
row_style = style_special if is_special else style
row_style_text = style_special_text if is_special else style_text
for j, value in enumerate(row_values):
if j == 12: # 文件路径列左对齐
sheet.write(i, j, str(value), row_style_text)
elif j == 11: # 开票日期列 - 写入真正的日期类型
# 从 raw_values 取原始 date 对象(tree 里的值是字符串)
# raw_values 结构: (amount, tax_rate, tax_amount, total_amount, invoice_number, seller_name, seller_tax_id, buyer_name, buyer_tax_id, project_name, date, pdf_file)
# [0] [1] [2] [3] [4] [5] [6] [7] [8] [9] [10] [11]
# 因此 date 对象在 raw_values[i-1][10]
raw_date = raw_values[i-1][10] if i-1 < len(raw_values) else None
if isinstance(raw_date, datetime.date):
# Excel 用 1900-01-01 起算,xlwt 会自动处理 date → serial
date_style = style_date_special if is_special else style_date
sheet.write(i, j, raw_date, date_style)
elif raw_date:
# 有值但不是 date 对象,按字符串写入
sheet.write(i, j, str(raw_date), row_style)
else:
sheet.write(i, j, "", row_style)
else:
sheet.write(i, j, str(value), row_style)
workbook.save(save_path)
messagebox.showinfo("完成", f"数据成功导出至\n{save_path}")
except Exception as e:
messagebox.showerror("错误", f"导出失败:{str(e)}")
def copy_total_amount_to_clipboard():
root.clipboard_clear()
root.clipboard_append(f"不含税合计: {total_amount_sum:.2f} 元\n税额合计: {total_tax_sum:.2f} 元\n价税合计: {total_sum:.2f} 元")
messagebox.showinfo("成功", f"总金额统计已复制到剪贴板")
# 创建滚动条框架
tree_frame = ttk.Frame(main_frame)
tree_frame.grid(column=0, row=0, pady=5, padx=5, sticky=(tk.N, tk.S, tk.E, tk.W))
tree_frame.columnconfigure(0, weight=1)
tree_frame.rowconfigure(0, weight=1)
scrollbar_y = ttk.Scrollbar(tree_frame, orient=tk.VERTICAL)
scrollbar_x = ttk.Scrollbar(tree_frame, orient=tk.HORIZONTAL)
tree = ttk.Treeview(tree_frame,
columns=("序号", "金额", "税率", "税额", "价税合计", "发票号码", "销售方名称", "销售方税号", "购买方名称", "购买方税号", "项目名称", "开票日期", "文件路径"),
show="headings",
yscrollcommand=scrollbar_y.set,
xscrollcommand=scrollbar_x.set)
scrollbar_y.config(command=tree.yview)
scrollbar_x.config(command=tree.xview)
tree.grid(column=0, row=0, sticky=(tk.N, tk.S, tk.E, tk.W))
scrollbar_y.grid(column=1, row=0, sticky=(tk.N, tk.S))
scrollbar_x.grid(column=0, row=1, sticky=(tk.E, tk.W))
tree.bind('<Double-1>', lambda event: open_pdf(tree.item(tree.selection())['values'][12]))
# 设置列
columns_config = {
"序号": {"width": 60, "anchor": "center"},
"金额": {"width": 120, "anchor": "center"},
"税率": {"width": 80, "anchor": "center"},
"税额": {"width": 120, "anchor": "center"},
"价税合计": {"width": 120, "anchor": "center"},
"发票号码": {"width": 200, "anchor": "center"},
"销售方名称": {"width": 260, "anchor": "w"},
"销售方税号": {"width": 200, "anchor": "center"},
"购买方名称": {"width": 260, "anchor": "w"},
"购买方税号": {"width": 200, "anchor": "center"},
"项目名称": {"width": 150, "anchor": "w"},
"开票日期": {"width": 120, "anchor": "center"},
"文件路径": {"width": 300, "anchor": "w"}
}
for col, config in columns_config.items():
tree.heading(col, text=col, command=lambda c=col: column_sorter(tree, c, 'num' if c in ['金额', '税率', '税额', '价税合计'] else 'str'))
tree.column(col, width=config["width"], anchor=config["anchor"])
# 定义非标准发票的黄色背景样式(航空/铁路/专利等无税号的发票)
tree.tag_configure('special', background='#FFFF99') # 黄色背景
# 插入数据
for index, value in enumerate(values, start=1):
# 判断是否是非标准发票:没有有效税号的发票(航空/铁路/专利等)
# value[6]是销售方税号列
tax_id = str(value[6]) if value[6] else ""
invalid_tax_ids = ["", "未识别", "航空客票无税号", "航空客票行程单无税号", "铁路票无税号", "专利无税号", "行程单无税号", "无税号", "读取失败"]
has_no_valid_tax_id = tax_id in invalid_tax_ids or len(tax_id) < 15
tags = ('special',) if has_no_valid_tax_id else ()
# 日期字段:可能是 datetime.date 对象或 None
date_value = value[10]
if isinstance(date_value, datetime.date):
# 格式化为 YYYY年M月D日(无前导零)
date_str = f"{date_value.year}年{date_value.month}月{date_value.day}日"
elif date_value is None:
date_str = ""
else:
date_str = str(date_value)
tree.insert("", "end", values=(
index,
f"{value[0]:.2f}" if value[0] > 0 else "0.00",
f"{value[1]:.1f}%" if value[1] > 0 else "0%",
f"{value[2]:.2f}" if value[2] > 0 else "0.00",
f"{value[3]:.2f}" if value[3] > 0 else "0.00",
value[4],
value[5],
value[6],
value[7] if value[7] else "", # 购买方名称
value[8] if value[8] else "", # 购买方税号
value[9], # 项目名称
date_str, # 开票日期(已格式化为 YYYY年M月D日)
value[11] # 文件路径
), tags=tags)
# 按钮框架
button_frame = ttk.Frame(main_frame)
button_frame.grid(column=1, row=0, padx=10, sticky=(tk.N, tk.S))
buttons = [
("重命名选中文件", rename_selected_files),
("复制统计金额", copy_total_amount_to_clipboard),
("导出到XLS", export_to_xls),
("退出", root.destroy)
]
for i, (text, command) in enumerate(buttons):
btn = ttk.Button(button_frame, text=text, command=command, width=18)
btn.grid(column=0, row=i, pady=5)
# 底部信息
info_frame = ttk.Frame(main_frame)
info_frame.grid(column=0, row=1, columnspan=2, pady=10, sticky=(tk.W, tk.E))
info_frame.columnconfigure(0, weight=1)
total_label = ttk.Label(info_frame, text=f"不含税合计: {total_amount_sum:.2f} 元 | 税额合计: {total_tax_sum:.2f} 元 | 价税合计: {total_sum:.2f} 元",
font=('Arial', 11, 'bold'), foreground='green')
total_label.grid(column=0, row=0, sticky=tk.W, padx=5)
stats_label = ttk.Label(info_frame, text=f"共处理 {len(values)} 张发票 | 双击行可打开PDF文件 | 黄色底色=非标准发票", font=('Arial', 9), foreground='gray')
stats_label.grid(column=0, row=1, sticky=tk.W, padx=5)
root.mainloop()
def browse_folder(entry):
folder = filedialog.askdirectory(title="请选择发票PDF文件夹路径")
if folder:
entry.delete(0, tk.END)
entry.insert(0, folder)
def start_processing(entry, input_root):
logging.info('开始处理')
folder = entry.get()
if not folder:
messagebox.showerror("错误", "请先选择或输入一个文件夹路径")
return
global pdf_files_folder
pdf_files = get_pdf_files(folder)
if pdf_files:
pdf_files_folder = os.path.dirname(pdf_files[0])
values = []
error_files = []
for i, pdf_file in enumerate(pdf_files, 1):
logging.info(f'处理文件 {i}/{len(pdf_files)}: {os.path.basename(pdf_file)}')
pdf_content = read_pdf_content(pdf_file)
total_amount = pdf_content["total_amount"]
# 如果价税合计为0但金额不为0,使用金额
if total_amount == 0 and pdf_content["amount"] > 0:
total_amount = pdf_content["amount"]
if total_amount > 0:
# 特殊发票(航空/铁路/专利等)没有 *xxx*yyy 格式,用 category 兜底填入"项目名称"列
display_project_name = pdf_content["project_name"] or pdf_content["category"]
values.append((
pdf_content["amount"],
pdf_content["tax_rate"],
pdf_content["tax_amount"],
pdf_content["total_amount"],
pdf_content["invoice_number"],
pdf_content["name"],
pdf_content["tax_id"],
pdf_content.get("buyer_name", ""),
pdf_content.get("buyer_tax_id", ""),
display_project_name,
pdf_content["date"],
pdf_file
))
else:
error_files.append(pdf_file)
logging.warning(f'无法提取有效金额: {pdf_file}')
if not values:
messagebox.showerror("错误", "没有找到有效的发票数据!\n请检查PDF文件是否为有效的电子发票。")
return
amounts = [value[0] for value in values]
tax_amounts = [value[2] for value in values]
total_amounts = [value[3] for value in values]
total_amount_sum = sum(amounts)
total_tax_sum = sum(tax_amounts)
total_sum = sum(total_amounts)
logging.info(f'不含税金额合计: {total_amount_sum}, 税额合计: {total_tax_sum}, 价税合计: {total_sum}, 有效文件数: {len(values)}')
if error_files:
messagebox.showwarning("警告", f"有 {len(error_files)} 个文件未能正确提取数据,已跳过。\n详细信息请查看 app.log 文件。")
display_results(values, total_amount_sum, total_tax_sum, total_sum, input_root, folder)
def main():
logging.info('程序启动')
root = tk.Tk()
root.title("发票金额统计系统")
center_window(root, width=700, height=300)
root.minsize(600, 280)
root.columnconfigure(0, weight=1)
root.rowconfigure(0, weight=1)
# 主框架
main_frame = ttk.Frame(root, padding="30")
main_frame.grid(column=0, row=0, sticky=(tk.W, tk.E, tk.N, tk.S))
main_frame.columnconfigure(0, weight=1)
# 标题
title_label = ttk.Label(main_frame, text="发票金额统计工具", font=('Arial', 18, 'bold'))
title_label.grid(column=0, row=0, pady=(0, 20))
# 说明
desc_label = ttk.Label(main_frame, text="自动提取电子发票中的金额、税额、价税合计、销售方信息及纳税人识别号", font=('Arial', 9), foreground='gray')
desc_label.grid(column=0, row=1, pady=(0, 20))
# 文件夹选择
folder_frame = ttk.Frame(main_frame)
folder_frame.grid(column=0, row=2, sticky=(tk.W, tk.E), pady=5)
folder_frame.columnconfigure(0, weight=1)
folder_label = ttk.Label(folder_frame, text="选择发票文件夹:", font=('Arial', 10))
folder_label.grid(column=0, row=0, sticky=tk.W)
folder_entry = ttk.Entry(folder_frame, font=('Arial', 10))
folder_entry.grid(column=0, row=1, sticky=(tk.W, tk.E), pady=5)
browse_button = ttk.Button(folder_frame, text="浏览", command=lambda: browse_folder(folder_entry), width=10)
browse_button.grid(column=1, row=1, padx=(10, 0))
# 开始按钮
start_button = ttk.Button(main_frame, text="开始处理", command=lambda: start_processing(folder_entry, root), width=20)
start_button.grid(column=0, row=3, pady=20)
root.mainloop()
if __name__ == "__main__":
main()