好友
阅读权限 25
听众
最后登录 1970-1-1
本帖最后由 Doublevv 于 2024-12-14 11:43 编辑
用SQL语句跨表查询excel表格
========================================
第二版
相较第一版
1.增加了保存查询方案、载入查询方案、删除查询方案功能,使用更加方便。
2.修改界面为800*600;
3.修改表别名为tb1、tb2,编写SQL语句时请相应修改。
界面
上源码:
[Python] 纯文本查看 复制代码
import tkinter as tkfrom tkinter import filedialog, messagebox, ttk, simpledialog
import os
import pandas as pd
import duckdb
import subprocess # 用于打开文件
import json
class CrossTableQueryTool:
def __init__(self, root):
self.root = root
self.root.title("Excel跨表格查询工具")
self.root.geometry("800x600") # 设置窗口大小
# 创建控件
self.label1 = tk.Label(root, text="1.添加要查询的Excel文件")
self.label1.place(x=10, y=10, width=200)
# 数据源文件操作按钮
self.btn_add = tk.Button(root, text="添加", command=self.add_table)
self.btn_add.place(x=380, y=10, width=120, height=30)
self.btn_delete = tk.Button(root, text="删除", command=self.delete_table)
self.btn_delete.place(x=520, y=10, width=120, height=30)
self.btn_load_solution = tk.Button(root, text="载入查询方案", command=self.load_solution_dialog)
self.btn_load_solution.place(x=660, y=10, width=120, height=30)
self.table_frame = tk.Frame(root)
self.table_frame.place(x=10, y=50, width=780, height=160)
self.table_tree = ttk.Treeview(self.table_frame, columns=("alias", "file_name", "file_path"), show="headings")
self.table_tree.heading("alias", text="别名")
self.table_tree.heading("file_name", text="文件名")
self.table_tree.heading("file_path", text="文件路径")
self.table_tree.column("alias", width=40)
self.table_tree.column("file_name", width=220)
self.table_tree.column("file_path", width=558)
self.table_tree.pack()
self.label2 = tk.Label(root, text="2.选择一个Excel文件保存查询结果")
self.label2.place(x=10, y=220, width=220)
self.btn_browse = tk.Button(root, text="浏览", command=self.browse_save_path)
self.btn_browse.place(x=520, y=220, width=120, height=30)
# 保存路径
self.label_save_path = tk.Label(root, text="保存路径:")
self.label_save_path.place(x=10, y=260, width=120)
self.entry_save_path = tk.Entry(root, width=640)
self.entry_save_path.place(x=132, y=260, width=644)
self.label3 = tk.Label(root, text="3.编写SQL语句")
self.label3.place(x=10, y=300, width=120)
self.txt_query = tk.Text(root, height=10, width=50)
self.txt_query.place(x=10, y=330, width=780, height=220)
# 执行查询、保存查询方案、删除查询方案按钮
self.btn_execute = tk.Button(root, text="执行查询", command=self.execute_query)
self.btn_execute.place(x=30, y=560, width=340, height=30)
self.btn_save_solution = tk.Button(root, text="保存查询方案", command=self.save_solution)
self.btn_save_solution.place(x=420, y=560, width=160, height=30)
self.btn_delete_solution = tk.Button(root, text="删除查询方案", command=self.delete_solution_dialog)
self.btn_delete_solution.place(x=600, y=560, width=160, height=30)
self.conn = duckdb.connect()
self.tables = {}
self.table_count = 0
def add_table(self):
if self.table_count >= 2:
messagebox.showerror("Error", "最多只能添加两个Excel文件。")
return
file_path = filedialog.askopenfilename(filetypes=[("Excel or csv files", "*.xlsx *.xls *.csv")])
if file_path:
alias = f"tb{self.table_count + 1}"
file_name = os.path.basename(file_path)
self.tables[alias] = file_path
self.table_tree.insert("", "end", values=(alias, file_name, file_path))
self.table_count += 1
def delete_table(self):
if not self.table_tree.selection():
messagebox.showerror("Error", "请先选择要删除的数据源文件。")
return
selected_item = self.table_tree.selection()[0]
alias = self.table_tree.item(selected_item)["values"][0]
del self.tables[alias]
self.table_tree.delete(selected_item)
self.table_count -= 1
def browse_save_path(self):
save_path = filedialog.asksaveasfilename(defaultextension=".xlsx",
filetypes=[("Excel files", "*.xlsx")]) or self.entry_save_path.get()
if save_path:
self.entry_save_path.delete(0, tk.END)
self.entry_save_path.insert(0, save_path)
def execute_query(self):
if not self.tables:
messagebox.showerror("Error", "请先添加Excel文件。")
return
if not self.txt_query.get("1.0", tk.END).strip():
messagebox.showerror("Error", "请输入SQL查询语句。")
return
if not self.entry_save_path.get():
messagebox.showerror("Error", "请输入保存路径。")
return
try:
# 读取Excel文件并加载到DuckDB中
for alias, file_path in self.tables.items():
if file_path.endswith(".csv"):
df = pd.read_csv(file_path, encoding='GBK') # 指定编码
elif file_path.endswith(".xls") or file_path.endswith(".xlsx"):
df = pd.read_excel(file_path)
else:
raise ValueError("不支持的文件类型")
self.conn.register(alias, df)
query = self.txt_query.get("1.0", tk.END).strip()
result = self.conn.execute(query).fetchdf()
# 保存结果到新的Excel文件
result.to_excel(self.entry_save_path.get(), index=False)
messagebox.showinfo("Success", "查询执行成功,结果已保存到新的Excel文件。")
# 自动打开生成的XLSX文件
subprocess.Popen([self.entry_save_path.get()], shell=True)
except Exception as e:
messagebox.showerror("Error", f"执行查询时发生错误: {str(e)}")
def save_solution(self):
solution_name = simpledialog.askstring("输入方案名称", "请输入查询方案名称:")
if solution_name:
solution_data = {
"name": solution_name,
"tables": [(alias, os.path.basename(path), path) for alias, path in self.tables.items()],
"save_path": self.entry_save_path.get(),
"sql_query": self.txt_query.get("1.0", tk.END).strip()
}
solutions = []
if os.path.exists("solutions.json"):
with open("solutions.json", "r", encoding="utf-8") as f:
solutions = json.load(f)
solutions.append(solution_data)
with open("solutions.json", "w", encoding="utf-8") as f:
json.dump(solutions, f, ensure_ascii=False, indent=4)
messagebox.showinfo("Success", f"查询方案 '{solution_name}' 已保存。")
def load_solution_dialog(self):
solutions = []
if os.path.exists("solutions.json"):
with open("solutions.json", "r", encoding="utf-8") as f:
solutions = json.load(f)
if not solutions:
messagebox.showerror("Error", "没有找到任何查询方案。")
return
solution_names = [sol["name"] for sol in solutions]
dialog = tk.Toplevel(self.root)
dialog.title("选择查询方案")
dialog.geometry("300x150")
label = tk.Label(dialog, text="请选择一个查询方案:")
label.pack(pady=10)
solution_var = tk.StringVar()
combobox_solutions = ttk.Combobox(dialog, textvariable=solution_var, values=solution_names)
combobox_solutions.pack(pady=5)
btn_select = tk.Button(dialog, text="选择",
command=lambda: self.load_selected_solution(solution_var.get(), dialog))
btn_select.pack(pady=10)
def load_selected_solution(self, selected_name, dialog):
if not selected_name:
messagebox.showerror("Error", "请选择一个查询方案。")
return
with open("solutions.json", "r", encoding="utf-8") as f:
solutions = json.load(f)
selected_solution = next((sol for sol in solutions if sol["name"] == selected_name), None)
if selected_solution:
self.tables.clear()
self.table_tree.delete(*self.table_tree.get_children())
self.table_count = 0
for alias, file_name, file_path in selected_solution["tables"]:
self.tables[alias] = file_path
self.table_tree.insert("", "end", values=(alias, file_name, file_path))
self.table_count += 1
self.entry_save_path.delete(0, tk.END)
self.entry_save_path.insert(0, selected_solution["save_path"])
self.txt_query.delete("1.0", tk.END)
self.txt_query.insert(tk.END, selected_solution["sql_query"])
messagebox.showinfo("Success", f"查询方案 '{selected_name}' 已加载。")
dialog.destroy()
else:
messagebox.showerror("Error", f"找不到名为 '{selected_name}' 的查询方案。")
def delete_solution_dialog(self):
solutions = []
if os.path.exists("solutions.json"):
with open("solutions.json", "r", encoding="utf-8") as f:
solutions = json.load(f)
if not solutions:
messagebox.showerror("Error", "没有找到任何查询方案。")
return
solution_names = [sol["name"] for sol in solutions]
dialog = tk.Toplevel(self.root)
dialog.title("删除方案")
dialog.geometry("300x150")
label = tk.Label(dialog, text="请选择要删除的方案:")
label.pack(pady=10)
solution_var = tk.StringVar()
combobox_solutions = ttk.Combobox(dialog, textvariable=solution_var, values=solution_names)
combobox_solutions.pack(pady=5)
btn_delete = tk.Button(dialog, text="删除",
command=lambda: self.delete_selected_solution(solution_var.get(), dialog))
btn_delete.pack(pady=10)
def delete_selected_solution(self, selected_name, dialog):
if not selected_name:
messagebox.showerror("Error", "请选择一个方案。")
return
with open("solutions.json", "r", encoding="utf-8") as f:
solutions = json.load(f)
updated_solutions = [sol for sol in solutions if sol["name"] != selected_name]
with open("solutions.json", "w", encoding="utf-8") as f:
json.dump(updated_solutions, f, ensure_ascii=False, indent=4)
messagebox.showinfo("Success", f"查询方案 '{selected_name}' 已删除。")
dialog.destroy()
if __name__ == "__main__":
root = tk.Tk()
app = CrossTableQueryTool(root)
root.mainloop()
========================================
第一版
界面展示:
前一阶段,一直在做2个excel表格关联查询,有一些BI软件可以做这事,都是比较大型的,直到看到头条推送了一条个人创作的小工具,感觉挺好,联系了他,可惜只在某宝售卖。
于是就用python弄了这个工具。
成品链接就不放了,对真正学习python的人,这个问题很容易解决。
测试数据
https://wwic.lanzouo.com/ieAee2gwb6be 密码52pj
存在问题:使用csv文件编码只能是GBK,可以用wps另存一下,就是GBK编码的了。
功能扩展暂没有精力弄了,想修改的自己解决吧。
源码: (不保证代码质量,未经完全测试,使用自担风险 )
[Python] 纯文本查看 复制代码
import tkinter as tk
from tkinter import filedialog, messagebox, ttk
import os
import pandas as pd
import duckdb
import subprocess # 用于打开文件
class CrossTableQueryTool:
def __init__(self, root):
self.root = root
self.root.title("Excel跨表查询工具")
self.root.geometry("500x460") # 设置窗口大小
# 创建控件
self.label1 = tk.Label(root, text="1. 选择2个Excel文件比对查询")
self.label1.place(x=4, y=10, width=200)
# 数据源文件操作按钮
self.btn_add = tk.Button(root, text="添加", command=self.add_table)
self.btn_add.place(x=240, y=10, width=60, height=22)
self.btn_delete = tk.Button(root, text="删除", command=self.delete_table)
self.btn_delete.place(x=320, y=10, width=60, height=22)
self.table_frame = tk.Frame(root)
self.table_frame.place(x=10, y=45, width=480, height=75)
self.table_tree = ttk.Treeview(self.table_frame, columns=("file_name", "alias", "file_path"), show="headings")
self.table_tree.heading("file_name", text="文件名")
self.table_tree.heading("alias", text="别名")
self.table_tree.heading("file_path", text="文件路径")
self.table_tree.column("file_name", width=120)
self.table_tree.column("alias", width=45)
self.table_tree.column("file_path", width=313)
self.table_tree.pack()
self.label2 = tk.Label(root, text="2.选择一个Excel文件保存查询结果")
self.label2.place(x=4, y=125, width=220)
# 保存路径和浏览按钮
self.label_save_path = tk.Label(root, text="保存路径:")
self.label_save_path.place(x=4, y=155, width=80)
self.entry_save_path = tk.Entry(root, width=40)
self.entry_save_path.place(x=80, y=155, width=340)
self.btn_browse = tk.Button(root, text="浏览", command=self.browse_save_path)
self.btn_browse.place(x=430, y=155, width=60, height=22)
self.label3 = tk.Label(root, text="3.编写SQL查询语句")
self.label3.place(x=15, y=185, width=120)
self.txt_query = tk.Text(root, height=10, width=50)
self.txt_query.place(x=10, y=210, width=480, height=200)
self.btn_execute = tk.Button(root, text="执行查询", command=self.execute_query)
self.btn_execute.place(x=100, y=420, width=300)
self.conn = duckdb.connect()
self.tables = {}
self.table_count = 0
def add_table(self):
if self.table_count >= 2:
messagebox.showerror("Error", "最多只能添加两个Excel文件。")
return
file_path = filedialog.askopenfilename(filetypes=[("Excel or csv files", "*.xlsx *.xls *.csv")])
if file_path:
file_name = os.path.basename(file_path)
alias = f"table{self.table_count + 1}"
self.tables[alias] = file_path
self.table_tree.insert("", "end", values=(file_name, alias, file_path))
self.table_count += 1
def delete_table(self):
if not self.table_tree.selection():
messagebox.showerror("Error", "请先选择要删除的数据源文件。")
return
selected_item = self.table_tree.selection()[0]
alias = self.table_tree.item(selected_item)["values"][1]
del self.tables[alias]
self.table_tree.delete(selected_item)
self.table_count -= 1
def browse_save_path(self):
save_path = filedialog.asksaveasfilename(defaultextension=".xlsx",
filetypes=[("Excel files", "*.xlsx")]) or self.entry_save_path.get()
if save_path:
self.entry_save_path.delete(0, tk.END)
self.entry_save_path.insert(0, save_path)
def execute_query(self):
if not self.tables:
messagebox.showerror("Error", "请先添加Excel文件。")
return
if not self.txt_query.get("1.0", tk.END).strip():
messagebox.showerror("Error", "请输入SQL查询语句。")
return
if not self.entry_save_path.get():
messagebox.showerror("Error", "请输入保存路径。")
return
try:
# 读取Excel文件并加载到DuckDB中
for alias, file_path in self.tables.items():
if file_path.endswith(".csv"):
df = pd.read_csv(file_path, encoding='GBK') # 指定编码
elif file_path.endswith(".xls") or file_path.endswith(".xlsx"):
df = pd.read_excel(file_path)
else:
raise ValueError("不支持的文件类型")
self.conn.register(alias, df)
query = self.txt_query.get("1.0", tk.END).strip()
result = self.conn.execute(query).fetchdf()
# 保存结果到新的Excel文件
result.to_excel(self.entry_save_path.get(), index=False)
messagebox.showinfo("Success", "查询执行成功,结果已保存到新的Excel文件。")
# 自动打开生成的XLSX文件
subprocess.Popen([self.entry_save_path.get()], shell=True)
except Exception as e:
messagebox.showerror("Error", f"执行查询时发生错误: {str(e)}")
if __name__ == "__main__":
root = tk.Tk()
app = CrossTableQueryTool(root)
root.mainloop()
免费评分
查看全部评分