[Python] 纯文本查看 复制代码
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
import sqlite3
import datetime
import math
from openpyxl import Workbook
import matplotlib
matplotlib.use("TkAgg")
from matplotlib.figure import Figure
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import matplotlib.pyplot as plt
from matplotlib import rcParams
rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans'] # 支持中文
rcParams['axes.unicode_minus'] = False
# ---------- 数据库初始化 ----------
DB_NAME = "ct_production.db"
def init_db():
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
# 产线表
c.execute('''CREATE TABLE IF NOT EXISTS production_lines (
id INTEGER PRIMARY KEY AUTOINCREMENT,
line_no TEXT UNIQUE NOT NULL,
line_name TEXT NOT NULL,
line_type TEXT NOT NULL,
machine_model TEXT,
process_notes TEXT
)''')
# 班次表
c.execute('''CREATE TABLE IF NOT EXISTS shift_config (
id INTEGER PRIMARY KEY AUTOINCREMENT,
shift_name TEXT NOT NULL,
total_minutes INTEGER NOT NULL,
fixed_break_minutes INTEGER DEFAULT 0
)''')
# 全局参数
c.execute('''CREATE TABLE IF NOT EXISTS global_params (
param_name TEXT PRIMARY KEY,
param_value TEXT NOT NULL
)''')
defaults = [
("default_oee", "0.85"),
("defect_rate", "0.02"),
("unit", "PCS/小时"),
("decimal_places", "2")
]
for name, val in defaults:
c.execute("INSERT OR IGNORE INTO global_params VALUES (?,?)", (name, val))
# 产品表
c.execute('''CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_model TEXT UNIQUE NOT NULL,
description TEXT,
line_id INTEGER,
FOREIGN KEY (line_id) REFERENCES production_lines(id)
)''')
c.execute("PRAGMA table_info(products)")
cols = [col[1] for col in c.fetchall()]
if 'line_id' not in cols:
try:
c.execute("ALTER TABLE products ADD COLUMN line_id INTEGER REFERENCES production_lines(id)")
except:
pass
# 工序表
c.execute('''CREATE TABLE IF NOT EXISTS product_processes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER NOT NULL,
seq INTEGER NOT NULL,
process_name TEXT NOT NULL,
standard_ct_sec REAL,
measured_ct_sec REAL,
notes TEXT,
locked INTEGER DEFAULT 0,
FOREIGN KEY (product_id) REFERENCES products(id)
)''')
# 循环时间表
c.execute('''CREATE TABLE IF NOT EXISTS process_cycle_times (
id INTEGER PRIMARY KEY AUTOINCREMENT,
process_id INTEGER NOT NULL,
cycle_time REAL NOT NULL,
note TEXT,
measurer TEXT DEFAULT '',
measure_date TEXT DEFAULT '',
FOREIGN KEY (process_id) REFERENCES product_processes(id) ON DELETE CASCADE
)''')
c.execute("PRAGMA table_info(process_cycle_times)")
cycle_cols = [col[1] for col in c.fetchall()]
if 'measurer' not in cycle_cols:
try:
c.execute("ALTER TABLE process_cycle_times ADD COLUMN measurer TEXT DEFAULT ''")
except: pass
if 'measure_date' not in cycle_cols:
try:
c.execute("ALTER TABLE process_cycle_times ADD COLUMN measure_date TEXT DEFAULT ''")
except: pass
# 员工工时表(新增)
c.execute('''CREATE TABLE IF NOT EXISTS process_manpower (
id INTEGER PRIMARY KEY AUTOINCREMENT,
process_id INTEGER NOT NULL UNIQUE,
worker_count INTEGER DEFAULT 1,
FOREIGN KEY (process_id) REFERENCES product_processes(id) ON DELETE CASCADE
)''')
# 旧表
c.execute('''CREATE TABLE IF NOT EXISTS ct_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
line_id INTEGER NOT NULL,
product_model TEXT NOT NULL,
process_name TEXT,
work_order TEXT,
standard_ct_sec REAL,
measured_ct_sec REAL,
measurer TEXT,
measure_date TEXT,
notes TEXT,
locked INTEGER DEFAULT 0,
version INTEGER DEFAULT 1,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (line_id) REFERENCES production_lines(id)
)''')
# 产能日志
c.execute('''CREATE TABLE IF NOT EXISTS capacity_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_model TEXT,
shift_name TEXT,
bottleneck_ct REAL,
hourly_cap REAL,
shift_cap REAL,
calc_time TEXT,
line_id INTEGER
)''')
# 线平衡快照表
c.execute('''CREATE TABLE IF NOT EXISTS line_balance_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
snapshot_name TEXT NOT NULL,
product_model TEXT NOT NULL,
line_id INTEGER,
balance_rate REAL,
bottleneck_ct REAL,
takt_time REAL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)''')
c.execute('''CREATE TABLE IF NOT EXISTS snapshot_processes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
snapshot_id INTEGER NOT NULL,
seq INTEGER,
process_name TEXT,
standard_ct_sec REAL,
measured_ct_sec REAL,
FOREIGN KEY (snapshot_id) REFERENCES line_balance_snapshots(id)
)''')
conn.commit()
conn.close()
# ---------- 业务逻辑 ----------
def calc_planned_uptime_min(shift_name):
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("SELECT total_minutes, fixed_break_minutes FROM shift_config WHERE shift_name=?", (shift_name,))
row = c.fetchone()
conn.close()
return (row[0] - row[1]) if row else 480
def get_default_params():
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("SELECT param_name, param_value FROM global_params")
params = {row[0]: row[1] for row in c.fetchall()}
conn.close()
return params
def calculate_capacity(bottleneck_ct, shift_name, efficiency=1.0, defect_rate=0.02, shifts_per_day=1):
planned_min = calc_planned_uptime_min(shift_name)
planned_hour = planned_min / 60.0
hourly_theory = 3600 / bottleneck_ct if bottleneck_ct > 0 else 0
hourly_actual = hourly_theory * efficiency * (1 - defect_rate)
shift_cap = hourly_actual * planned_hour
daily_cap = shift_cap * shifts_per_day
monthly_cap = daily_cap * 30
return {
"hourly_theory": round(hourly_theory, 2),
"hourly_actual": round(hourly_actual, 2),
"shift_cap": round(shift_cap, 2),
"daily_cap": round(daily_cap, 2),
"monthly_cap": round(monthly_cap, 2),
"planned_hour": planned_hour,
"efficiency": efficiency,
"defect_rate": defect_rate
}
def get_product_processes(product_model):
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("SELECT id, line_id FROM products WHERE product_model=?", (product_model,))
prod = c.fetchone()
if not prod:
conn.close()
return [], None
prod_id, line_id = prod
c.execute("""SELECT id, seq, process_name, standard_ct_sec, measured_ct_sec, notes, locked
FROM product_processes WHERE product_id=? ORDER BY seq""", (prod_id,))
rows = c.fetchall()
processes = []
for r in rows:
# 获取员工人数
c.execute("SELECT worker_count FROM process_manpower WHERE process_id=?", (r[0],))
worker_row = c.fetchone()
worker_count = worker_row[0] if worker_row else 1
processes.append({
"db_id": r[0], "seq": r[1], "name": r[2],
"std_ct": r[3], "meas_ct": r[4], "notes": r[5],
"locked": r[6], "worker_count": worker_count
})
conn.close()
return processes, line_id
def get_cycle_times(process_id):
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("SELECT id, cycle_time, note, measurer, measure_date FROM process_cycle_times WHERE process_id=? ORDER BY id", (process_id,))
rows = c.fetchall()
conn.close()
return [{"id": r[0], "time": r[1], "note": r[2], "measurer": r[3], "measure_date": r[4]} for r in rows]
def calc_avg_cycle_time(cycle_list):
if not cycle_list:
return None
times = [c["time"] for c in cycle_list]
n = len(times)
if n >= 5:
sorted_times = sorted(times)
trimmed = sorted_times[1:-1]
return sum(trimmed) / len(trimmed) if trimmed else 0
else:
return sum(times) / n
def calculate_bottleneck_and_balance(processes):
if not processes:
return None, 0, 0
eff_cts = []
for p in processes:
ct = p["std_ct"] if p["std_ct"] is not None else p["meas_ct"]
if ct is not None:
eff_cts.append((p, ct))
if not eff_cts:
return None, 0, 0
max_item = max(eff_cts, key=lambda x: x[1])
bottleneck = max_item[0]
max_ct = max_item[1]
total_ct = sum(ct for _, ct in eff_cts)
n = len(eff_cts)
balance = (total_ct / (max_ct * n)) * 100 if max_ct > 0 else 0
return bottleneck, round(balance, 2), max_ct
# ---------- 主界面 ----------
class MainApplication(tk.Tk):
def __init__(self):
super().__init__()
self.title("产能管理工具 v1.0 (线平衡)")
self.geometry("1300x750")
self.resizable(True, True)
self.create_widgets()
self.center_window()
self.protocol("WM_DELETE_WINDOW", self.on_closing)
def on_closing(self):
# 清理matplotlib资源
plt.close('all')
self.destroy()
def center_window(self):
self.update_idletasks()
w, h = self.winfo_width(), self.winfo_height()
x = (self.winfo_screenwidth() - w) // 2
y = (self.winfo_screenheight() - h) // 2
self.geometry(f"+{x}+{y}")
def create_widgets(self):
menubar = tk.Menu(self)
help_menu = tk.Menu(menubar, tearoff=0)
help_menu.add_command(label="使用须知", command=self.show_disclaimer)
menubar.add_cascade(label="帮助", menu=help_menu)
self.config(menu=menubar)
self.notebook = ttk.Notebook(self)
self.notebook.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# 六个标签页
self.tab1 = ttk.Frame(self.notebook)
self.notebook.add(self.tab1, text="基础配置")
self.build_config_tab()
self.tab2 = ttk.Frame(self.notebook)
self.notebook.add(self.tab2, text="品番工序与CT")
self.build_process_tab()
self.tab3 = ttk.Frame(self.notebook)
self.notebook.add(self.tab3, text="效率计算")
self.build_efficiency_tab()
self.tab4 = ttk.Frame(self.notebook)
self.notebook.add(self.tab4, text="产能测算")
self.build_capacity_tab()
self.tab6 = ttk.Frame(self.notebook)
self.notebook.add(self.tab6, text="线平衡分析")
self.build_balance_analysis_tab()
self.tab5 = ttk.Frame(self.notebook)
self.notebook.add(self.tab5, text="报表与导出")
self.build_report_tab()
def show_disclaimer(self):
msg = ("使用须知:\n"
"1. 请妥善保管个人数据,定期备份数据库文件(ct_production.db)。\n"
"2. 开发者仅提供软件使用授权,不对误操作导致的数据丢失负责。\n"
"3. 最终解释权归开发者所有。")
messagebox.showinfo("使用须知", msg)
# ================== 全局刷新 ==================
def refresh_all_ui(self):
self.refresh_line_list()
self.refresh_shift_list()
self.refresh_all_combos()
if hasattr(self, 'product_combo') and self.product_combo.get():
self.load_processes()
if hasattr(self, 'balance_product_combo') and self.balance_product_combo.get():
self.refresh_balance_chart()
def refresh_all_combos(self):
line_values = self.get_line_values()
for attr in ['filter_line', 'cap_line_combo', 'balance_line_combo']:
if hasattr(self, attr):
combo = getattr(self, attr)
old = combo.get() if combo['values'] else None
combo['values'] = line_values
if old in line_values:
combo.set(old)
elif line_values:
combo.current(0)
if attr == 'filter_line' and self.filter_line.get():
self.on_filter_line(None)
if attr == 'cap_line_combo' and self.cap_line_combo.get():
self.cap_on_line(None)
if attr == 'balance_line_combo' and self.balance_line_combo.get():
self.on_balance_line(None)
if hasattr(self, 'cap_shift'):
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("SELECT shift_name FROM shift_config")
shifts = [r[0] for r in c.fetchall()]
conn.close()
old = self.cap_shift.get() if self.cap_shift['values'] else None
self.cap_shift['values'] = shifts
if old in shifts:
self.cap_shift.set(old)
elif shifts:
self.cap_shift.current(0)
def get_line_values(self):
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("SELECT line_no || ' - ' || line_name FROM production_lines")
values = [r[0] for r in c.fetchall()]
conn.close()
return values
# ================== 基础配置 ==================
def build_config_tab(self):
f_line = ttk.LabelFrame(self.tab1, text="产线基础信息", padding=10)
f_line.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
f_line.columnconfigure(0, weight=1)
f_line.columnconfigure(1, weight=1)
ttk.Label(f_line, text="产线:").grid(row=0, column=0, sticky="e", padx=2)
self.line_no = ttk.Entry(f_line, width=12)
self.line_no.grid(row=0, column=1, sticky="w", padx=2)
ttk.Label(f_line, text="名称:").grid(row=0, column=2, sticky="e", padx=2)
self.line_name = ttk.Entry(f_line, width=15)
self.line_name.grid(row=0, column=3, sticky="w", padx=2)
ttk.Label(f_line, text="类型:").grid(row=0, column=4, sticky="e", padx=2)
self.line_type = ttk.Combobox(f_line, values=["单工位独立产线", "多工位串联流水线"], state="readonly", width=18)
self.line_type.grid(row=0, column=5, sticky="w", padx=2)
self.line_type.current(1)
ttk.Label(f_line, text="品番:").grid(row=1, column=0, sticky="e", padx=2)
self.line_machine = ttk.Entry(f_line, width=20)
self.line_machine.grid(row=1, column=1, columnspan=2, sticky="w", padx=2)
ttk.Label(f_line, text="备注:").grid(row=1, column=3, sticky="e", padx=2)
self.line_notes = ttk.Entry(f_line, width=20)
self.line_notes.grid(row=1, column=4, columnspan=2, sticky="w", padx=2)
btn_frame = ttk.Frame(f_line)
btn_frame.grid(row=2, column=0, columnspan=6, pady=8)
ttk.Button(btn_frame, text="添加", command=self.add_line).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="修改", command=self.update_line).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="删除", command=self.delete_line).pack(side=tk.LEFT, padx=5)
self.line_tree = ttk.Treeview(f_line, columns=("id","no","name","type"), show="headings", height=5)
self.line_tree.heading("id", text="ID")
self.line_tree.heading("no", text="产线")
self.line_tree.heading("name", text="名称")
self.line_tree.heading("type", text="类型")
self.line_tree.column("id", width=30)
self.line_tree.grid(row=3, column=0, columnspan=6, sticky="nsew", pady=5)
self.line_tree.bind("<Double-1>", self.on_line_select)
f_line.rowconfigure(3, weight=1)
f_shift = ttk.LabelFrame(self.tab1, text="班次配置", padding=10)
f_shift.pack(fill=tk.X, padx=5, pady=5)
f_shift.columnconfigure(0, weight=1)
f_shift.columnconfigure(1, weight=1)
ttk.Label(f_shift, text="班次:").grid(row=0, column=0, sticky="e", padx=2)
self.shift_name = ttk.Entry(f_shift, width=12)
self.shift_name.grid(row=0, column=1, sticky="w", padx=2)
ttk.Label(f_shift, text="总时长(分):").grid(row=0, column=2, sticky="e", padx=2)
self.shift_total = ttk.Entry(f_shift, width=8)
self.shift_total.grid(row=0, column=3, sticky="w", padx=2)
ttk.Label(f_shift, text="固定停机(分):").grid(row=0, column=4, sticky="e", padx=2)
self.shift_break = ttk.Entry(f_shift, width=8)
self.shift_break.grid(row=0, column=5, sticky="w", padx=2)
shift_btn = ttk.Frame(f_shift)
shift_btn.grid(row=0, column=6, columnspan=2, padx=5)
ttk.Button(shift_btn, text="新增", command=self.add_shift).pack(side=tk.LEFT, padx=2)
ttk.Button(shift_btn, text="修改", command=self.update_shift).pack(side=tk.LEFT, padx=2)
ttk.Button(shift_btn, text="删除", command=self.delete_shift).pack(side=tk.LEFT, padx=2)
self.shift_tree = ttk.Treeview(f_shift, columns=("id","name","total","break","eff"), show="headings", height=4)
self.shift_tree.heading("id", text="ID")
self.shift_tree.heading("name", text="班次")
self.shift_tree.heading("total", text="总时长(分)")
self.shift_tree.heading("break", text="停机(分)")
self.shift_tree.heading("eff", text="稼动时间(分)")
self.shift_tree.column("id", width=30, anchor="center")
self.shift_tree.grid(row=1, column=0, columnspan=8, sticky="nsew", pady=5)
self.shift_tree.bind("<Double-1>", self.on_shift_select)
f_shift.rowconfigure(1, weight=1)
f_param = ttk.LabelFrame(self.tab1, text="默认参数", padding=10)
f_param.pack(fill=tk.X, padx=5, pady=5)
ttk.Label(f_param, text="稼动率(%):").grid(row=0, column=0, sticky="e", padx=2)
self.param_oee = ttk.Entry(f_param, width=8)
self.param_oee.insert(0, get_default_params()["default_oee"])
self.param_oee.grid(row=0, column=1, sticky="w", padx=2)
ttk.Label(f_param, text="不良率(%):").grid(row=0, column=2, sticky="e", padx=2)
self.param_defect = ttk.Entry(f_param, width=8)
self.param_defect.insert(0, get_default_params()["defect_rate"])
self.param_defect.grid(row=0, column=3, sticky="w", padx=2)
ttk.Button(f_param, text="保存", command=self.save_default_params).grid(row=0, column=4, padx=10)
self.refresh_line_list()
self.refresh_shift_list()
def add_line(self):
no, name = self.line_no.get().strip(), self.line_name.get().strip()
if not no or not name:
return messagebox.showwarning("提示", "产线和名称必填")
try:
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("INSERT INTO production_lines (line_no,line_name,line_type,machine_model,process_notes) VALUES (?,?,?,?,?)",
(no, name, self.line_type.get(), self.line_machine.get(), self.line_notes.get()))
conn.commit()
conn.close()
self.refresh_all_ui()
except sqlite3.IntegrityError:
messagebox.showerror("错误", "产线编号重复")
def update_line(self):
sel = self.line_tree.selection()
if not sel: return
item = self.line_tree.item(sel[0])
lid = item['values'][0]
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("UPDATE production_lines SET line_no=?,line_name=?,line_type=?,machine_model=?,process_notes=? WHERE id=?",
(self.line_no.get().strip(), self.line_name.get().strip(), self.line_type.get(),
self.line_machine.get().strip(), self.line_notes.get().strip(), lid))
conn.commit()
conn.close()
self.refresh_all_ui()
def delete_line(self):
sel = self.line_tree.selection()
if not sel: return
if messagebox.askyesno("确认", "删除产线将清除关联数据,继续?"):
lid = self.line_tree.item(sel[0])['values'][0]
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("DELETE FROM product_processes WHERE product_id IN (SELECT id FROM products WHERE line_id=?)", (lid,))
c.execute("DELETE FROM products WHERE line_id=?", (lid,))
c.execute("DELETE FROM production_lines WHERE id=?", (lid,))
conn.commit()
conn.close()
self.refresh_all_ui()
messagebox.showinfo("完成", "已删除")
def refresh_line_list(self):
if not hasattr(self, 'line_tree'): return
for row in self.line_tree.get_children():
self.line_tree.delete(row)
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("SELECT id, line_no, line_name, line_type FROM production_lines")
for r in c.fetchall():
self.line_tree.insert("", tk.END, values=r)
conn.close()
def on_line_select(self, event):
sel = self.line_tree.selection()
if sel:
vals = self.line_tree.item(sel[0])['values']
self.line_no.delete(0, tk.END); self.line_no.insert(0, vals[1])
self.line_name.delete(0, tk.END); self.line_name.insert(0, vals[2])
self.line_type.set(vals[3])
def add_shift(self):
name = self.shift_name.get().strip()
if not name or not self.shift_total.get().isdigit():
return messagebox.showwarning("提示", "请输入班次名和有效数字")
total = int(self.shift_total.get())
brk = int(self.shift_break.get()) if self.shift_break.get().isdigit() else 0
try:
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("INSERT INTO shift_config (shift_name,total_minutes,fixed_break_minutes) VALUES (?,?,?)",
(name, total, brk))
conn.commit()
conn.close()
self.refresh_all_ui()
except sqlite3.IntegrityError:
messagebox.showerror("错误", "班次名称已存在")
def update_shift(self):
sel = self.shift_tree.selection()
if not sel: return messagebox.showwarning("提示", "请先选择要修改的班次")
item = self.shift_tree.item(sel[0])
shift_id = item['values'][0]
name = self.shift_name.get().strip()
if not name or not self.shift_total.get().isdigit():
return messagebox.showwarning("提示", "请输入班次名和有效数字")
total = int(self.shift_total.get())
brk = int(self.shift_break.get()) if self.shift_break.get().isdigit() else 0
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("UPDATE shift_config SET shift_name=?, total_minutes=?, fixed_break_minutes=? WHERE id=?",
(name, total, brk, shift_id))
conn.commit()
conn.close()
self.refresh_all_ui()
messagebox.showinfo("成功", "班次已更新")
def delete_shift(self):
sel = self.shift_tree.selection()
if not sel: return
if messagebox.askyesno("确认", "确定删除所选班次?"):
shift_id = self.shift_tree.item(sel[0])['values'][0]
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("DELETE FROM shift_config WHERE id=?", (shift_id,))
conn.commit()
conn.close()
self.refresh_all_ui()
messagebox.showinfo("完成", "班次已删除")
def refresh_shift_list(self):
if not hasattr(self, 'shift_tree'): return
for row in self.shift_tree.get_children():
self.shift_tree.delete(row)
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("SELECT id, shift_name, total_minutes, fixed_break_minutes FROM shift_config")
for r in c.fetchall():
self.shift_tree.insert("", tk.END, values=(r[0], r[1], r[2], r[3], r[2]-r[3]))
conn.close()
def on_shift_select(self, event):
sel = self.shift_tree.selection()
if sel:
vals = self.shift_tree.item(sel[0])['values']
self.shift_name.delete(0, tk.END); self.shift_name.insert(0, vals[1])
self.shift_total.delete(0, tk.END); self.shift_total.insert(0, vals[2])
self.shift_break.delete(0, tk.END); self.shift_break.insert(0, vals[3])
def save_default_params(self):
try:
float(self.param_oee.get()), float(self.param_defect.get())
except:
return messagebox.showerror("错误", "参数需为数字")
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("UPDATE global_params SET param_value=? WHERE param_name='default_oee'", (self.param_oee.get(),))
c.execute("UPDATE global_params SET param_value=? WHERE param_name='defect_rate'", (self.param_defect.get(),))
conn.commit()
conn.close()
self.refresh_all_ui()
messagebox.showinfo("成功", "参数已保存")
# ================== 品番工序与CT ==================
def build_process_tab(self):
top = ttk.Frame(self.tab2)
top.pack(fill=tk.X, padx=5, pady=5)
ttk.Label(top, text="产线筛选:").grid(row=0, column=0, sticky="w")
self.filter_line = ttk.Combobox(top, state="readonly", width=25)
self.filter_line.grid(row=0, column=1, padx=5)
self.filter_line.bind("<<ComboboxSelected>>", self.on_filter_line)
ttk.Label(top, text="品番:").grid(row=0, column=2, sticky="w")
self.product_combo = ttk.Combobox(top, state="readonly", width=25)
self.product_combo.grid(row=0, column=3, padx=5)
self.product_combo.bind("<<ComboboxSelected>>", self.on_product_selected)
ttk.Button(top, text="新增品番", command=self.add_product_dialog).grid(row=0, column=4, padx=5)
ttk.Button(top, text="删除品番", command=self.delete_product).grid(row=0, column=5, padx=5)
paned_main = ttk.PanedWindow(self.tab2, orient=tk.VERTICAL)
paned_main.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
f_table = ttk.Frame(paned_main)
paned_main.add(f_table, weight=1)
self.process_tree = ttk.Treeview(f_table, columns=("seq","name","std","meas","notes","locked"), show="headings")
self.process_tree.heading("seq", text="序号")
self.process_tree.heading("name", text="工序名称")
self.process_tree.heading("std", text="标准CT(s)")
self.process_tree.heading("meas", text="实测CT(s)")
self.process_tree.heading("notes", text="备注")
self.process_tree.heading("locked", text="锁定")
self.process_tree.column("seq", width=50, anchor="center")
self.process_tree.column("std", width=90, anchor="center")
self.process_tree.column("meas", width=90, anchor="center")
self.process_tree.column("locked", width=50, anchor="center")
self.process_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self.process_tree.tag_configure('bottleneck', background='#FF6347', foreground='white')
self.process_tree.bind("<<TreeviewSelect>>", self.on_process_selected)
scroll = ttk.Scrollbar(f_table, command=self.process_tree.yview)
self.process_tree.configure(yscrollcommand=scroll.set)
scroll.pack(side=tk.RIGHT, fill=tk.Y)
paned_edit = ttk.PanedWindow(paned_main, orient=tk.HORIZONTAL)
paned_main.add(paned_edit, weight=1)
f_edit = ttk.LabelFrame(paned_edit, text="工序信息")
paned_edit.add(f_edit, weight=1)
ttk.Label(f_edit, text="工序名:").grid(row=0, column=0, sticky="e", padx=5, pady=2)
self.proc_name = ttk.Entry(f_edit, width=18)
self.proc_name.grid(row=0, column=1, padx=5)
ttk.Label(f_edit, text="标准CT:").grid(row=1, column=0, sticky="e", padx=5, pady=2)
self.proc_std = ttk.Entry(f_edit, width=10)
self.proc_std.grid(row=1, column=1, padx=5)
ttk.Label(f_edit, text="实测CT:").grid(row=2, column=0, sticky="e", padx=5, pady=2)
self.proc_meas = ttk.Entry(f_edit, width=10)
self.proc_meas.grid(row=2, column=1, padx=5)
ttk.Label(f_edit, text="备注:").grid(row=3, column=0, sticky="e", padx=5, pady=2)
self.proc_notes = ttk.Entry(f_edit, width=18)
self.proc_notes.grid(row=3, column=1, padx=5)
# 新增员工人数输入
ttk.Label(f_edit, text="员工人数:").grid(row=4, column=0, sticky="e", padx=5, pady=2)
self.proc_worker_entry = ttk.Entry(f_edit, width=5)
self.proc_worker_entry.grid(row=4, column=1, sticky="w", padx=5)
btn_frame = ttk.Frame(f_edit)
btn_frame.grid(row=5, column=0, columnspan=2, pady=5)
ttk.Button(btn_frame, text="新增", command=self.add_process).pack(side=tk.LEFT, padx=2)
ttk.Button(btn_frame, text="修改", command=self.update_process).pack(side=tk.LEFT, padx=2)
ttk.Button(btn_frame, text="删除", command=self.delete_process).pack(side=tk.LEFT, padx=2)
ttk.Button(btn_frame, text="锁定", command=self.toggle_lock).pack(side=tk.LEFT, padx=2)
f_cycle = ttk.LabelFrame(paned_edit, text="循环时间记录(当前工序)")
paned_edit.add(f_cycle, weight=2)
self.cycle_tree = ttk.Treeview(f_cycle, columns=("id","time","measurer","date","note"), show="headings", height=6)
self.cycle_tree.heading("id", text="ID")
self.cycle_tree.heading("time", text="循环时间(秒)")
self.cycle_tree.heading("measurer", text="测量人")
self.cycle_tree.heading("date", text="测量日期")
self.cycle_tree.heading("note", text="备注")
self.cycle_tree.column("id", width=30, anchor="center")
self.cycle_tree.column("time", width=80, anchor="center")
self.cycle_tree.column("measurer", width=80)
self.cycle_tree.column("date", width=90)
self.cycle_tree.pack(fill=tk.BOTH, expand=True)
f_input = ttk.Frame(f_cycle)
f_input.pack(fill=tk.X, pady=5)
ttk.Label(f_input, text="时间(s):").pack(side=tk.LEFT, padx=2)
self.cycle_time_entry = ttk.Entry(f_input, width=8)
self.cycle_time_entry.pack(side=tk.LEFT, padx=2)
ttk.Label(f_input, text="测量人:").pack(side=tk.LEFT, padx=2)
self.cycle_measurer_entry = ttk.Entry(f_input, width=10)
self.cycle_measurer_entry.pack(side=tk.LEFT, padx=2)
ttk.Label(f_input, text="日期:").pack(side=tk.LEFT, padx=2)
self.cycle_date_entry = ttk.Entry(f_input, width=10)
self.cycle_date_entry.insert(0, datetime.date.today().isoformat())
self.cycle_date_entry.pack(side=tk.LEFT, padx=2)
ttk.Label(f_input, text="备注:").pack(side=tk.LEFT, padx=2)
self.cycle_note_entry = ttk.Entry(f_input, width=12)
self.cycle_note_entry.pack(side=tk.LEFT, padx=2)
ttk.Button(f_input, text="添加", command=self.add_cycle_time).pack(side=tk.LEFT, padx=5)
ttk.Button(f_input, text="删除", command=self.delete_cycle_time).pack(side=tk.LEFT, padx=5)
bottom = ttk.Frame(self.tab2)
bottom.pack(fill=tk.X, padx=5, pady=5)
self.avg_label = ttk.Label(bottom, text="实测平均CT: --", font=("Arial", 10, "bold"))
self.avg_label.pack(side=tk.LEFT, padx=20)
self.bottleneck_label = ttk.Label(bottom, text="", font=("Arial", 10, "bold"))
self.bottleneck_label.pack(side=tk.LEFT, padx=20)
self.current_process_id = None
self.load_filter_lines()
def load_filter_lines(self):
values = self.get_line_values()
old = self.filter_line.get() if self.filter_line['values'] else None
self.filter_line['values'] = values
if values:
if old in values:
self.filter_line.set(old)
else:
self.filter_line.current(0)
if not old or old != self.filter_line.get():
self.on_filter_line()
else:
self.product_combo['values'] = []
self.process_tree.delete(*self.process_tree.get_children())
self.cycle_tree.delete(*self.cycle_tree.get_children())
def on_filter_line(self, event=None):
line_str = self.filter_line.get()
if not line_str: return
line_no = line_str.split(" - ")[0]
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("SELECT id FROM production_lines WHERE line_no=?", (line_no,))
line_id = c.fetchone()
if not line_id: return
line_id = line_id[0]
c.execute("SELECT product_model FROM products WHERE line_id=?", (line_id,))
products = [r[0] for r in c.fetchall()]
conn.close()
old = self.product_combo.get()
self.product_combo['values'] = products
if old in products:
self.product_combo.set(old); self.load_processes()
elif products:
self.product_combo.current(0); self.load_processes()
else:
self.product_combo.set('')
self.process_tree.delete(*self.process_tree.get_children())
self.cycle_tree.delete(*self.cycle_tree.get_children())
self.bottleneck_label.config(text="")
self.avg_label.config(text="实测平均CT: --")
def add_product_dialog(self):
line_str = self.filter_line.get()
if not line_str:
messagebox.showwarning("提示", "请先选择所属产线")
return
line_no = line_str.split(" - ")[0]
dialog = tk.Toplevel(self)
dialog.title("新增品番")
dialog.geometry("300x150")
dialog.resizable(False, False)
ttk.Label(dialog, text="品番型号:").pack(pady=5)
model_var = tk.StringVar()
ttk.Entry(dialog, textvariable=model_var, width=25).pack()
ttk.Label(dialog, text=f"所属产线: {line_str}").pack()
def save():
model = model_var.get().strip()
if not model: return messagebox.showwarning("提示", "型号不能为空")
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
try:
c.execute("INSERT INTO products (product_model, line_id) VALUES (?, (SELECT id FROM production_lines WHERE line_no=?))",
(model, line_no))
conn.commit()
messagebox.showinfo("成功", f"品番 {model} 已添加")
dialog.destroy()
self.refresh_all_ui()
except sqlite3.IntegrityError:
messagebox.showerror("错误", "型号已存在")
finally:
conn.close()
ttk.Button(dialog, text="保存", command=save).pack(pady=10)
dialog.transient(self)
dialog.grab_set()
self.wait_window(dialog)
def delete_product(self):
product = self.product_combo.get()
if not product: return
if messagebox.askyesno("确认", f"删除品番 {product} 及所有工序?"):
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("DELETE FROM products WHERE product_model=?", (product,))
conn.commit()
conn.close()
self.refresh_all_ui()
messagebox.showinfo("完成", "已删除")
def on_product_selected(self, event=None):
self.load_processes()
def load_processes(self):
sel = self.process_tree.selection()
selected_seq = None
if sel:
try: selected_seq = self.process_tree.item(sel[0])['values'][0]
except: pass
for row in self.process_tree.get_children():
self.process_tree.delete(row)
self.cycle_tree.delete(*self.cycle_tree.get_children())
product = self.product_combo.get()
if not product:
self.bottleneck_label.config(text="")
self.avg_label.config(text="实测平均CT: --")
return
processes, _ = get_product_processes(product)
bottleneck_info, balance, _ = calculate_bottleneck_and_balance(processes)
for p in processes:
std = f"{p['std_ct']:.2f}" if p['std_ct'] is not None else ""
meas = f"{p['meas_ct']:.2f}" if p['meas_ct'] is not None else ""
lock = "是" if p['locked'] else "否"
item = self.process_tree.insert("", tk.END, values=(p['seq'], p['name'], std, meas, p['notes'], lock))
if bottleneck_info and p['seq'] == bottleneck_info['seq']:
self.process_tree.item(item, tags=('bottleneck',))
if bottleneck_info:
self.bottleneck_label.config(text=f"瓶颈: {bottleneck_info['name']} | 平衡率: {balance:.2f}%")
else:
self.bottleneck_label.config(text="无法计算瓶颈(无有效CT)")
if selected_seq:
for item in self.process_tree.get_children():
if self.process_tree.item(item)['values'][0] == selected_seq:
self.process_tree.selection_set(item)
self.process_tree.focus(item)
self.on_process_selected(None)
break
def on_process_selected(self, event):
sel = self.process_tree.selection()
if not sel: return
item = self.process_tree.item(sel[0])
seq = item['values'][0]
product = self.product_combo.get()
processes, _ = get_product_processes(product)
proc = next((p for p in processes if p['seq'] == seq), None)
if not proc: return
self.current_process_id = proc['db_id']
self.proc_name.delete(0, tk.END); self.proc_name.insert(0, proc['name'])
self.proc_std.delete(0, tk.END)
if proc['std_ct'] is not None: self.proc_std.insert(0, f"{proc['std_ct']:.2f}")
self.proc_meas.delete(0, tk.END)
if proc['meas_ct'] is not None: self.proc_meas.insert(0, f"{proc['meas_ct']:.2f}")
self.proc_notes.delete(0, tk.END); self.proc_notes.insert(0, proc['notes'] if proc['notes'] else "")
# 显示员工人数
self.proc_worker_entry.delete(0, tk.END)
self.proc_worker_entry.insert(0, str(proc.get('worker_count', 1)))
self.load_cycle_times(proc['db_id'])
def load_cycle_times(self, process_id):
for row in self.cycle_tree.get_children():
self.cycle_tree.delete(row)
cycles = get_cycle_times(process_id)
for c in cycles:
self.cycle_tree.insert("", tk.END, values=(c['id'], f"{c['time']:.2f}", c['measurer'], c['measure_date'], c['note']))
avg = calc_avg_cycle_time(cycles)
if avg is not None:
self.avg_label.config(text=f"实测平均CT: {avg:.2f} 秒")
else:
self.avg_label.config(text="实测平均CT: --")
def add_cycle_time(self):
if not self.current_process_id:
messagebox.showwarning("提示", "请先选择一道工序"); return
time_str = self.cycle_time_entry.get().strip()
if not time_str: return
try: time = float(time_str)
except: messagebox.showerror("错误", "请输入有效数字"); return
measurer = self.cycle_measurer_entry.get().strip()
date = self.cycle_date_entry.get().strip()
note = self.cycle_note_entry.get().strip()
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("INSERT INTO process_cycle_times (process_id, cycle_time, note, measurer, measure_date) VALUES (?,?,?,?,?)",
(self.current_process_id, time, note, measurer, date))
conn.commit()
conn.close()
self.load_cycle_times(self.current_process_id)
self.cycle_time_entry.delete(0, tk.END)
self.auto_update_meas_from_cycles()
def delete_cycle_time(self):
sel = self.cycle_tree.selection()
if not sel: return
ct_id = self.cycle_tree.item(sel[0])['values'][0]
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("DELETE FROM process_cycle_times WHERE id=?", (ct_id,))
conn.commit()
conn.close()
self.load_cycle_times(self.current_process_id)
self.auto_update_meas_from_cycles()
def auto_update_meas_from_cycles(self):
if not self.current_process_id: return
cycles = get_cycle_times(self.current_process_id)
avg = calc_avg_cycle_time(cycles)
if avg is not None:
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("UPDATE product_processes SET measured_ct_sec=? WHERE id=?", (round(avg, 2), self.current_process_id))
conn.commit()
conn.close()
self.load_processes()
def refresh_meas_from_cycles(self):
if not self.current_process_id:
messagebox.showwarning("提示", "请选择工序"); return
self.auto_update_meas_from_cycles()
def add_process(self):
product = self.product_combo.get()
if not product: return messagebox.showwarning("提示", "请选择品番")
name = self.proc_name.get().strip()
if not name: return messagebox.showwarning("提示", "工序名称必填")
std = self.proc_std.get().strip()
std_ct = float(std) if std else None
meas = self.proc_meas.get().strip()
meas_ct = float(meas) if meas else None
notes = self.proc_notes.get().strip()
worker_str = self.proc_worker_entry.get().strip()
worker_count = int(worker_str) if worker_str.isdigit() else 1
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("SELECT id FROM products WHERE product_model=?", (product,))
prod_id = c.fetchone()[0]
c.execute("SELECT MAX(seq) FROM product_processes WHERE product_id=?", (prod_id,))
max_seq = c.fetchone()[0] or 0
seq = max_seq + 1
c.execute("INSERT INTO product_processes (product_id, seq, process_name, standard_ct_sec, measured_ct_sec, notes) VALUES (?,?,?,?,?,?)",
(prod_id, seq, name, std_ct, meas_ct, notes))
process_id = c.lastrowid
# 插入员工人数
c.execute("INSERT OR REPLACE INTO process_manpower (process_id, worker_count) VALUES (?,?)", (process_id, worker_count))
conn.commit()
conn.close()
self.load_processes()
self.proc_name.delete(0, tk.END); self.proc_std.delete(0, tk.END); self.proc_meas.delete(0, tk.END)
def update_process(self):
if not self.current_process_id: return messagebox.showwarning("提示", "请选择要修改的工序")
name = self.proc_name.get().strip()
if not name: return messagebox.showwarning("提示", "工序名称不能为空")
std = self.proc_std.get().strip()
std_ct = float(std) if std else None
meas = self.proc_meas.get().strip()
meas_ct = float(meas) if meas else None
notes = self.proc_notes.get().strip()
worker_str = self.proc_worker_entry.get().strip()
worker_count = int(worker_str) if worker_str.isdigit() else 1
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("UPDATE product_processes SET process_name=?, standard_ct_sec=?, measured_ct_sec=?, notes=? WHERE id=?",
(name, std_ct, meas_ct, notes, self.current_process_id))
c.execute("INSERT OR REPLACE INTO process_manpower (process_id, worker_count) VALUES (?,?)", (self.current_process_id, worker_count))
conn.commit()
conn.close()
self.load_processes()
def delete_process(self):
if not self.current_process_id: return messagebox.showwarning("提示", "请选择工序")
if messagebox.askyesno("确认", "删除工序将同时删除其循环时间数据,继续?"):
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("DELETE FROM process_cycle_times WHERE process_id=?", (self.current_process_id,))
c.execute("DELETE FROM product_processes WHERE id=?", (self.current_process_id,))
c.execute("DELETE FROM process_manpower WHERE process_id=?", (self.current_process_id,))
conn.commit()
conn.close()
self.current_process_id = None
self.cycle_tree.delete(*self.cycle_tree.get_children())
self.load_processes()
def toggle_lock(self):
if not self.current_process_id: return
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("SELECT locked FROM product_processes WHERE id=?", (self.current_process_id,))
locked = c.fetchone()[0]
new = 0 if locked else 1
c.execute("UPDATE product_processes SET locked=? WHERE id=?", (new, self.current_process_id))
conn.commit()
conn.close()
self.load_processes()
# ================== 效率计算 ==================
def build_efficiency_tab(self):
frame = ttk.Frame(self.tab3)
frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
f_avail = ttk.LabelFrame(frame, text="稼动率计算")
f_avail.pack(fill=tk.X, pady=10)
ttk.Label(f_avail, text="产品直接稼动时间(分):").grid(row=0, column=0, sticky="w", padx=5, pady=5)
self.avail_actual_entry = ttk.Entry(f_avail, width=12)
self.avail_actual_entry.grid(row=0, column=1, padx=5)
ttk.Label(f_avail, text="计划稼动时间(分):").grid(row=0, column=2, sticky="w", padx=5, pady=5)
self.avail_planned_entry = ttk.Entry(f_avail, width=12)
self.avail_planned_entry.grid(row=0, column=3, padx=5)
ttk.Button(f_avail, text="计算稼动率", command=self.calc_availability).grid(row=0, column=4, padx=10)
self.avail_result = ttk.Label(f_avail, text="稼动率: --", font=("Arial", 10, "bold"))
self.avail_result.grid(row=0, column=5, padx=5)
f_oper = ttk.LabelFrame(frame, text="可动率计算")
f_oper.pack(fill=tk.X, pady=10)
ttk.Label(f_oper, text="CT(秒):").grid(row=0, column=0, sticky="w", padx=5, pady=5)
self.oper_ct_entry = ttk.Entry(f_oper, width=10)
self.oper_ct_entry.grid(row=0, column=1, padx=5)
ttk.Label(f_oper, text="生产数量:").grid(row=0, column=2, sticky="w", padx=5, pady=5)
self.oper_qty_entry = ttk.Entry(f_oper, width=10)
self.oper_qty_entry.grid(row=0, column=3, padx=5)
ttk.Label(f_oper, text="直接稼动时间(分):").grid(row=0, column=4, sticky="w", padx=5, pady=5)
self.oper_direct_time_entry = ttk.Entry(f_oper, width=12)
self.oper_direct_time_entry.grid(row=0, column=5, padx=5)
ttk.Button(f_oper, text="计算可动率", command=self.calc_operability).grid(row=0, column=6, padx=10)
self.oper_result = ttk.Label(f_oper, text="可动率: --", font=("Arial", 10, "bold"))
self.oper_result.grid(row=0, column=7, padx=5)
def calc_availability(self):
try:
actual = float(self.avail_actual_entry.get())
planned = float(self.avail_planned_entry.get())
if planned == 0: raise ValueError
avail = actual / planned
self.avail_result.config(text=f"稼动率: {avail*100:.2f}%")
except:
messagebox.showerror("错误", "请输入有效的数字(计划时间不能为0)")
def calc_operability(self):
try:
ct = float(self.oper_ct_entry.get())
qty = float(self.oper_qty_entry.get())
direct = float(self.oper_direct_time_entry.get())
if direct == 0: raise ValueError
numerator = ct * qty
denominator = direct * 60
if denominator == 0: raise ValueError
oper = numerator / denominator
self.oper_result.config(text=f"可动率: {oper*100:.2f}%")
except:
messagebox.showerror("错误", "请输入有效数字(直接稼动时间不能为0)")
# ================== 产能测算(含目标反推) ==================
def build_capacity_tab(self):
f_calc = ttk.LabelFrame(self.tab4, text="产能核心测算", padding=10)
f_calc.pack(fill=tk.X, padx=10, pady=10)
ttk.Label(f_calc, text="产线:").grid(row=0, column=0, sticky="e", padx=2)
self.cap_line_combo = ttk.Combobox(f_calc, state="readonly", width=25)
self.cap_line_combo.grid(row=0, column=1, padx=2)
self.cap_line_combo.bind("<<ComboboxSelected>>", self.cap_on_line)
ttk.Label(f_calc, text="品番:").grid(row=0, column=2, sticky="e", padx=2)
self.cap_product_combo = ttk.Combobox(f_calc, state="readonly", width=25)
self.cap_product_combo.grid(row=0, column=3, padx=2)
ttk.Label(f_calc, text="班次:").grid(row=1, column=0, sticky="e", padx=2)
self.cap_shift = ttk.Combobox(f_calc, state="readonly", width=12)
self.cap_shift.grid(row=1, column=1, padx=2)
self.cap_shift.bind("<<ComboboxSelected>>", self.update_cap_planned_time)
ttk.Label(f_calc, text="日开班数:").grid(row=1, column=2, sticky="e", padx=2)
self.cap_shifts = ttk.Spinbox(f_calc, from_=1, to=10, width=5, command=self.update_cap_planned_time)
self.cap_shifts.set(1)
self.cap_shifts.grid(row=1, column=3, padx=2)
ttk.Label(f_calc, text="计划稼动时间(分/班):").grid(row=2, column=0, sticky="e", padx=2)
self.planned_uptime_var = tk.StringVar()
self.planned_uptime_entry = ttk.Entry(f_calc, textvariable=self.planned_uptime_var, state='readonly', width=10)
self.planned_uptime_entry.grid(row=2, column=1, padx=2)
ttk.Label(f_calc, text="稼动率(%):").grid(row=2, column=2, sticky="e", padx=2)
self.cap_efficiency = ttk.Entry(f_calc, width=8)
self.cap_efficiency.insert(0, "100")
self.cap_efficiency.grid(row=2, column=3, padx=2)
ttk.Label(f_calc, text="不良率(%):").grid(row=3, column=0, sticky="e", padx=2)
self.cap_defect = ttk.Entry(f_calc, width=8)
self.cap_defect.insert(0, get_default_params()["defect_rate"])
self.cap_defect.grid(row=3, column=1, padx=2)
ttk.Label(f_calc, text="计划日产量(PCS):").grid(row=3, column=2, sticky="e", padx=2)
self.plan_output_entry = ttk.Entry(f_calc, width=10)
self.plan_output_entry.grid(row=3, column=3, padx=2)
ttk.Button(f_calc, text="计算产能", command=self.calc_capacity).grid(row=4, column=0, columnspan=4, pady=10)
self.cap_result = ttk.Treeview(self.tab4, columns=("item","value"), show="headings", height=8)
self.cap_result.heading("item", text="项目")
self.cap_result.heading("value", text="数值")
self.cap_result.column("item", width=250)
self.cap_result.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
frame_target = ttk.LabelFrame(self.tab4, text="目标产量反推", padding=10)
frame_target.pack(fill=tk.X, padx=10, pady=5)
ttk.Label(frame_target, text="目标日产量(PCS):").grid(row=0, column=0)
self.reverse_target_entry = ttk.Entry(frame_target, width=12)
self.reverse_target_entry.grid(row=0, column=1, padx=5)
ttk.Button(frame_target, text="反推所需条件", command=self.reverse_calc).grid(row=0, column=2, padx=10)
self.reverse_result_label = ttk.Label(frame_target, text="")
self.reverse_result_label.grid(row=1, column=0, columnspan=3, pady=5)
self.load_cap_lines()
def load_cap_lines(self):
self.cap_line_combo['values'] = self.get_line_values()
if self.cap_line_combo['values']:
self.cap_line_combo.current(0)
self.cap_on_line()
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("SELECT shift_name FROM shift_config")
shifts = [r[0] for r in c.fetchall()]
conn.close()
self.cap_shift['values'] = shifts
if shifts:
self.cap_shift.current(0)
self.update_cap_planned_time()
def cap_on_line(self, event=None):
line_str = self.cap_line_combo.get()
if not line_str: return
line_no = line_str.split(" - ")[0]
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("SELECT product_model FROM products WHERE line_id=(SELECT id FROM production_lines WHERE line_no=?)", (line_no,))
products = [r[0] for r in c.fetchall()]
conn.close()
old = self.cap_product_combo.get()
self.cap_product_combo['values'] = products
if old in products:
self.cap_product_combo.set(old)
elif products:
self.cap_product_combo.current(0)
def update_cap_planned_time(self, *args):
shift = self.cap_shift.get()
if not shift: return
planned_min = calc_planned_uptime_min(shift)
self.planned_uptime_var.set(str(planned_min))
def calc_capacity(self):
product = self.cap_product_combo.get()
if not product: return messagebox.showwarning("提示", "请选择品番")
shift = self.cap_shift.get()
if not shift: return messagebox.showwarning("提示", "请选择班次")
try:
efficiency = float(self.cap_efficiency.get()) / 100.0
defect = float(self.cap_defect.get()) / 100.0
shifts_per_day = int(self.cap_shifts.get())
except:
return messagebox.showerror("错误", "请输入有效数字")
processes, _ = get_product_processes(product)
if not processes: return messagebox.showwarning("提示", "该品番无工序数据")
bottleneck_info, balance, bottleneck_ct = calculate_bottleneck_and_balance(processes)
if bottleneck_ct == 0: return messagebox.showwarning("提示", "无法计算瓶颈CT")
res = calculate_capacity(bottleneck_ct, shift, efficiency, defect, shifts_per_day)
planned_uptime_min = calc_planned_uptime_min(shift)
daily_planned_seconds = planned_uptime_min * 60 * shifts_per_day
target_str = self.plan_output_entry.get()
tt = None
if target_str:
try:
target = float(target_str)
if target > 0: tt = daily_planned_seconds / target
except: pass
for row in self.cap_result.get_children():
self.cap_result.delete(row)
data = [
("瓶颈工序", bottleneck_info['name'] if bottleneck_info else "无"),
("瓶颈CT(秒)", f"{bottleneck_ct:.2f}"),
("线平衡率", f"{balance:.2f}%"),
("理论小时产能 (PCS/h)", res['hourly_theory']),
("实际小时产能 (PCS/h)", res['hourly_actual']),
("单班产能 (PCS)", res['shift_cap']),
("日产能 (PCS)", res['daily_cap']),
("月产能 (PCS)", res['monthly_cap']),
("稼动率", f"{efficiency*100:.2f}%"),
("不良率", f"{defect*100:.2f}%"),
]
if tt is not None:
data.append(("生产节拍(TT,秒)", f"{tt:.2f}"))
for d in data:
self.cap_result.insert("", tk.END, values=d)
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("INSERT INTO capacity_logs (product_model, shift_name, bottleneck_ct, hourly_cap, shift_cap, calc_time, line_id) VALUES (?,?,?,?,?,?,?)",
(product, shift, bottleneck_ct, res['hourly_actual'], res['shift_cap'],
datetime.datetime.now().isoformat(), ''))
conn.commit()
conn.close()
def reverse_calc(self):
try:
target = float(self.reverse_target_entry.get())
except ValueError:
messagebox.showerror("错误", "请输入有效的目标日产量")
return
shift = self.cap_shift.get()
if not shift:
messagebox.showerror("错误", "请选择班次")
return
try:
oee = float(self.cap_efficiency.get()) / 100.0
defect = float(self.cap_defect.get()) / 100.0
except ValueError:
messagebox.showerror("错误", "稼动率、不良率必须为数字")
return
planned_min = calc_planned_uptime_min(shift)
effective_hour = planned_min / 60.0
for shifts in range(1, 11):
ct_required = (3600 * effective_hour * oee * (1 - defect) * shifts) / target
if ct_required >= 0.5:
break
else:
self.reverse_result_label.config(text=f"即使开设10班也无法达到目标,请降低目标或优化工艺")
return
info = (f"达成日产量 {target} PCS 需:\n"
f" - CT ≤ {ct_required:.2f} 秒\n"
f" - 开班数 ≥ {shifts}\n"
f" - 稼动率 ≥ {oee*100:.2f}%\n"
f" - 不良率 ≤ {defect*100:.2f}%")
self.reverse_result_label.config(text=info)
# ================== 线平衡分析(完整专业版) ==================
def build_balance_analysis_tab(self):
top = ttk.Frame(self.tab6)
top.pack(fill=tk.X, padx=5, pady=5)
ttk.Label(top, text="产线:").grid(row=0, column=0, sticky="w")
self.balance_line_combo = ttk.Combobox(top, state="readonly", width=25)
self.balance_line_combo.grid(row=0, column=1, padx=5)
self.balance_line_combo.bind("<<ComboboxSelected>>", self.on_balance_line)
ttk.Label(top, text="品番:").grid(row=0, column=2, sticky="w")
self.balance_product_combo = ttk.Combobox(top, state="readonly", width=25)
self.balance_product_combo.grid(row=0, column=3, padx=5)
self.balance_product_combo.bind("<<ComboboxSelected>>", self.on_balance_product)
ttk.Label(top, text="目标日产量(计算TT):").grid(row=1, column=0, sticky="w")
self.balance_target_entry = ttk.Entry(top, width=10)
self.balance_target_entry.grid(row=1, column=1, sticky="w", padx=5)
ttk.Label(top, text="班次:").grid(row=1, column=2, sticky="w")
self.balance_shift_combo = ttk.Combobox(top, state="readonly", width=12)
self.balance_shift_combo.grid(row=1, column=3, sticky="w", padx=5)
self.balance_shift_combo.bind("<<ComboboxSelected>>", self.update_balance_tt)
ttk.Label(top, text="开班数:").grid(row=1, column=4, sticky="w")
self.balance_shifts_spin = ttk.Spinbox(top, from_=1, to=10, width=4, command=self.update_balance_tt)
self.balance_shifts_spin.set(1)
self.balance_shifts_spin.grid(row=1, column=5, sticky="w")
ttk.Button(top, text="刷新图表", command=self.refresh_balance_chart).grid(row=2, column=0, columnspan=2, pady=5)
ttk.Button(top, text="导出图表图片", command=self.export_chart_image).grid(row=2, column=2, columnspan=2, pady=5)
ttk.Button(top, text="自动合并建议", command=self.auto_merge_suggestion).grid(row=2, column=4, columnspan=2, pady=5)
paned = ttk.PanedWindow(self.tab6, orient=tk.HORIZONTAL)
paned.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# 左:图表
fig_frame = ttk.Frame(paned)
paned.add(fig_frame, weight=3)
self.balance_figure = Figure(figsize=(7, 4.5), dpi=100)
self.balance_ax = self.balance_figure.add_subplot(111)
self.balance_canvas = FigureCanvasTkAgg(self.balance_figure, master=fig_frame)
self.balance_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
# 右:信息与改善
right_frame = ttk.Frame(paned)
paned.add(right_frame, weight=1)
info_frame = ttk.LabelFrame(right_frame, text="关键指标与负荷率")
info_frame.pack(fill=tk.X, padx=5, pady=5)
self.balance_info_label = ttk.Label(info_frame, text="", font=("Arial", 10))
self.balance_info_label.pack(pady=5)
sim_frame = ttk.LabelFrame(right_frame, text="改善模拟")
sim_frame.pack(fill=tk.X, padx=5, pady=5)
ttk.Label(sim_frame, text="选择工序:").grid(row=0, column=0, sticky="w", padx=2)
self.sim_process_combo = ttk.Combobox(sim_frame, width=15)
self.sim_process_combo.grid(row=0, column=1, padx=2)
ttk.Label(sim_frame, text="新CT(秒):").grid(row=1, column=0, sticky="w", padx=2)
self.sim_ct_entry = ttk.Entry(sim_frame, width=10)
self.sim_ct_entry.grid(row=1, column=1, padx=2)
ttk.Button(sim_frame, text="应用模拟", command=self.apply_simulation).grid(row=2, column=0, columnspan=2, pady=5)
ttk.Button(sim_frame, text="恢复原始", command=self.refresh_balance_chart).grid(row=3, column=0, columnspan=2)
advice_frame = ttk.LabelFrame(right_frame, text="改善建议")
advice_frame.pack(fill=tk.X, padx=5, pady=5)
self.advice_label = ttk.Label(advice_frame, text="", wraplength=200)
self.advice_label.pack(pady=5)
snap_frame = ttk.LabelFrame(right_frame, text="快照管理")
snap_frame.pack(fill=tk.X, padx=5, pady=5)
ttk.Button(snap_frame, text="保存当前快照", command=self.save_snapshot).pack(pady=2)
self.snapshot_combo = ttk.Combobox(snap_frame, state="readonly", width=20)
self.snapshot_combo.pack(pady=2)
ttk.Button(snap_frame, text="加载快照", command=self.load_snapshot).pack(pady=2)
ttk.Button(snap_frame, text="对比快照(与当前)", command=self.compare_snapshot).pack(pady=2)
self.load_balance_combos()
self.current_processes_sim = []
def load_balance_combos(self):
self.balance_line_combo['values'] = self.get_line_values()
if self.balance_line_combo['values']:
self.balance_line_combo.current(0)
self.on_balance_line()
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("SELECT shift_name FROM shift_config")
shifts = [r[0] for r in c.fetchall()]
conn.close()
self.balance_shift_combo['values'] = shifts
if shifts:
self.balance_shift_combo.current(0)
self.update_balance_tt()
def on_balance_line(self, event=None):
line_str = self.balance_line_combo.get()
if not line_str: return
line_no = line_str.split(" - ")[0]
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("SELECT product_model FROM products WHERE line_id=(SELECT id FROM production_lines WHERE line_no=?)", (line_no,))
products = [r[0] for r in c.fetchall()]
conn.close()
old = self.balance_product_combo.get()
self.balance_product_combo['values'] = products
if old in products:
self.balance_product_combo.set(old)
elif products:
self.balance_product_combo.current(0)
self.refresh_balance_chart()
def on_balance_product(self, event=None):
self.refresh_balance_chart()
def update_balance_tt(self, *args):
self.refresh_balance_chart()
def refresh_balance_chart(self):
product = self.balance_product_combo.get()
if not product:
self.balance_ax.clear()
self.balance_canvas.draw()
self.balance_info_label.config(text="")
return
processes, line_id = get_product_processes(product)
if not processes:
self.balance_ax.clear()
self.balance_canvas.draw()
self.balance_info_label.config(text="无工序数据")
return
self.current_processes_sim = [dict(p) for p in processes]
bottleneck_info, balance, bottleneck_ct = calculate_bottleneck_and_balance(processes)
shift = self.balance_shift_combo.get()
shifts_per_day = int(self.balance_shifts_spin.get()) if self.balance_shifts_spin.get() else 1
tt = None
target_str = self.balance_target_entry.get()
if target_str and shift:
try:
target = float(target_str)
planned_min = calc_planned_uptime_min(shift)
daily_sec = planned_min * 60 * shifts_per_day
if target > 0: tt = daily_sec / target
except: pass
# 绘图
self.balance_ax.clear()
names = [f"{p['seq']}-{p['name']}" for p in processes]
ct_values = [p["std_ct"] if p["std_ct"] is not None else (p["meas_ct"] if p["meas_ct"] is not None else 0) for p in processes]
colors = []
for p, v in zip(processes, ct_values):
if bottleneck_info and p['seq'] == bottleneck_info['seq']:
colors.append('#d62728') # 深红
else:
colors.append('#1f77b4') # 经典蓝
bars = self.balance_ax.bar(names, ct_values, color=colors, edgecolor='black', linewidth=0.5)
if bottleneck_ct > 0:
self.balance_ax.axhline(y=bottleneck_ct, color='red', linestyle='--', linewidth=2, label=f'瓶颈CT={bottleneck_ct:.1f}s')
if tt:
self.balance_ax.axhline(y=tt, color='green', linestyle=':', linewidth=2, label=f'TT={tt:.2f}s')
self.balance_ax.set_ylabel('时间 (秒)', fontsize=10)
self.balance_ax.set_title(f'{product} 线平衡图 (平衡率={balance:.2f}%)', fontsize=11)
self.balance_ax.legend(loc='upper right')
# 数值标签
for bar, val in zip(bars, ct_values):
self.balance_ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.3, f'{val:.2f}',
ha='center', va='bottom', fontsize=8)
self.balance_figure.tight_layout()
self.balance_canvas.draw()
# 计算负荷率(相对于TT或瓶颈CT)
ref_ct = tt if tt else bottleneck_ct
load_info = "负荷率 (CT/基准):\n"
for p in processes:
ct = p["std_ct"] if p["std_ct"] is not None else p["meas_ct"]
if ct and ref_ct and ref_ct > 0:
rate = ct / ref_ct * 100
load_info += f"{p['seq']}-{p['name']}: {rate:.1f}% (工人:{p.get('worker_count',1)})\n"
else:
load_info += f"{p['seq']}-{p['name']}: --\n"
info_text = f"瓶颈工序: {bottleneck_info['name'] if bottleneck_info else '无'}\n"
info_text += f"瓶颈CT: {bottleneck_ct:.2f}s\n"
info_text += f"线平衡率: {balance:.2f}%\n"
if tt: info_text += f"节拍时间(TT): {tt:.2f}s\n"
info_text += "\n" + load_info
self.balance_info_label.config(text=info_text)
# 改善建议
advice = ""
if balance < 60:
advice = "平衡率极低,建议重点优化瓶颈工序,考虑工序拆分或增加工位。"
elif balance < 80:
advice = "平衡率一般,可尝试合并相近工位或减少瓶颈CT。"
elif balance < 90:
advice = "平衡率良好,轻微调整可达更优。"
else:
advice = "平衡率优秀,继续保持。"
if bottleneck_ct and tt and bottleneck_ct > tt:
advice += "\n警告:瓶颈CT超过节拍时间,无法满足目标产量,需改善!"
self.advice_label.config(text=advice)
# 更新模拟下拉
self.sim_process_combo['values'] = [f"{p['seq']}-{p['name']}" for p in processes]
if processes:
self.sim_process_combo.current(0)
self.refresh_snapshot_list(product)
def apply_simulation(self):
if not self.current_processes_sim:
messagebox.showwarning("提示", "请先加载产品数据")
return
sim_str = self.sim_process_combo.get()
if not sim_str: return
seq = int(sim_str.split("-")[0])
new_ct_str = self.sim_ct_entry.get()
if not new_ct_str: return
try:
new_ct = float(new_ct_str)
except:
messagebox.showerror("错误", "请输入有效数字")
return
sim_processes = [dict(p) for p in self.current_processes_sim]
for p in sim_processes:
if p["seq"] == seq:
p["std_ct"] = new_ct
break
self.current_processes_sim = sim_processes
bottleneck_info, balance, bottleneck_ct = calculate_bottleneck_and_balance(sim_processes)
product = self.balance_product_combo.get()
shift = self.balance_shift_combo.get()
shifts_per_day = int(self.balance_shifts_spin.get()) if self.balance_shifts_spin.get() else 1
tt = None
target_str = self.balance_target_entry.get()
if target_str and shift:
try:
target = float(target_str)
planned_min = calc_planned_uptime_min(shift)
daily_sec = planned_min * 60 * shifts_per_day
if target > 0: tt = daily_sec / target
except: pass
self.balance_ax.clear()
names = [f"{p['seq']}-{p['name']}" for p in sim_processes]
ct_values = [p["std_ct"] if p["std_ct"] is not None else (p["meas_ct"] if p["meas_ct"] is not None else 0) for p in sim_processes]
colors = ['#d62728' if bottleneck_info and p['seq'] == bottleneck_info['seq'] else '#1f77b4' for p in sim_processes]
bars = self.balance_ax.bar(names, ct_values, color=colors, edgecolor='black')
if bottleneck_ct > 0:
self.balance_ax.axhline(y=bottleneck_ct, color='red', linestyle='--', linewidth=2, label=f'瓶颈CT={bottleneck_ct:.2f}s')
if tt:
self.balance_ax.axhline(y=tt, color='green', linestyle=':', linewidth=2, label=f'TT={tt:.2f}s')
self.balance_ax.set_ylabel('时间 (秒)', fontsize=10)
self.balance_ax.set_title(f'{product} (模拟) 线平衡图 (平衡率={balance:.2f}%)', fontsize=11)
self.balance_ax.legend()
for bar, val in zip(bars, ct_values):
self.balance_ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.3, f'{val:.2f}',
ha='center', va='bottom', fontsize=8)
self.balance_figure.tight_layout()
self.balance_canvas.draw()
def export_chart_image(self):
file_path = filedialog.asksaveasfilename(defaultextension=".png", filetypes=[("PNG图片", "*.png"), ("所有文件", "*.*")])
if file_path:
self.balance_figure.savefig(file_path, dpi=150, bbox_inches='tight')
messagebox.showinfo("成功", f"图表已保存至 {file_path}")
def auto_merge_suggestion(self):
"""简单的自动合并建议:寻找相邻CT均较低的工序,建议合并"""
if not self.current_processes_sim:
messagebox.showwarning("提示", "请先加载产品数据")
return
processes = self.current_processes_sim
suggestions = []
i = 0
while i < len(processes) - 1:
p1 = processes[i]
p2 = processes[i+1]
ct1 = p1["std_ct"] if p1["std_ct"] is not None else p1["meas_ct"]
ct2 = p2["std_ct"] if p2["std_ct"] is not None else p2["meas_ct"]
if ct1 is None or ct2 is None:
i += 1
continue
# 如果两个工序CT都小于瓶颈CT的60%,建议合并
_, _, bottleneck_ct = calculate_bottleneck_and_balance(processes)
if bottleneck_ct > 0 and ct1 < bottleneck_ct * 0.6 and ct2 < bottleneck_ct * 0.6:
suggestions.append(f"工序{p1['seq']}和{p2['seq']}可考虑合并(当前CT:{ct1:.2f}+{ct2:.2f}={ct1+ct2:.2f})")
i += 1
if suggestions:
msg = "自动合并建议:\n" + "\n".join(suggestions)
else:
msg = "未找到合适的合并建议。"
messagebox.showinfo("合并建议", msg)
def save_snapshot(self):
product = self.balance_product_combo.get()
if not product:
messagebox.showwarning("提示", "请先选择品番")
return
processes = self.current_processes_sim
if not processes:
processes, _ = get_product_processes(product)
if not processes:
messagebox.showwarning("提示", "无工序数据")
return
bottleneck_info, balance, bottleneck_ct = calculate_bottleneck_and_balance(processes)
shift = self.balance_shift_combo.get()
shifts_per_day = int(self.balance_shifts_spin.get()) if self.balance_shifts_spin.get() else 1
tt = None
target_str = self.balance_target_entry.get()
if target_str and shift:
try:
target = float(target_str)
planned_min = calc_planned_uptime_min(shift)
daily_sec = planned_min * 60 * shifts_per_day
if target > 0: tt = daily_sec / target
except: pass
snapshot_name = f"{product}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}"
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("INSERT INTO line_balance_snapshots (snapshot_name, product_model, line_id, balance_rate, bottleneck_ct, takt_time) VALUES (?,?,?,?,?,?)",
(snapshot_name, product, None, balance, bottleneck_ct, tt))
snapshot_id = c.lastrowid
for p in processes:
c.execute("INSERT INTO snapshot_processes (snapshot_id, seq, process_name, standard_ct_sec, measured_ct_sec) VALUES (?,?,?,?,?)",
(snapshot_id, p["seq"], p["name"], p["std_ct"], p["meas_ct"]))
conn.commit()
conn.close()
self.refresh_snapshot_list(product)
messagebox.showinfo("成功", f"快照 {snapshot_name} 已保存")
def refresh_snapshot_list(self, product):
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("SELECT snapshot_name FROM line_balance_snapshots WHERE product_model=? ORDER BY created_at DESC", (product,))
snaps = [r[0] for r in c.fetchall()]
conn.close()
self.snapshot_combo['values'] = snaps
if snaps:
self.snapshot_combo.current(0)
def load_snapshot(self):
snap_name = self.snapshot_combo.get()
if not snap_name: return
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("SELECT id FROM line_balance_snapshots WHERE snapshot_name=?", (snap_name,))
row = c.fetchone()
if not row:
messagebox.showerror("错误", "快照不存在")
conn.close(); return
snap_id = row[0]
c.execute("SELECT seq, process_name, standard_ct_sec, measured_ct_sec FROM snapshot_processes WHERE snapshot_id=? ORDER BY seq", (snap_id,))
rows = c.fetchall()
conn.close()
processes = []
for r in rows:
processes.append({"seq": r[0], "name": r[1], "std_ct": r[2], "meas_ct": r[3]})
self.current_processes_sim = processes
bottleneck_info, balance, bottleneck_ct = calculate_bottleneck_and_balance(processes)
product = self.balance_product_combo.get()
shift = self.balance_shift_combo.get()
shifts_per_day = int(self.balance_shifts_spin.get()) if self.balance_shifts_spin.get() else 1
tt = None
target_str = self.balance_target_entry.get()
if target_str and shift:
try:
target = float(target_str)
planned_min = calc_planned_uptime_min(shift)
daily_sec = planned_min * 60 * shifts_per_day
if target > 0: tt = daily_sec / target
except: pass
self.balance_ax.clear()
names = [f"{p['seq']}-{p['name']}" for p in processes]
ct_values = [p["std_ct"] if p["std_ct"] is not None else (p["meas_ct"] if p["meas_ct"] is not None else 0) for p in processes]
colors = ['#d62728' if bottleneck_info and p['seq'] == bottleneck_info['seq'] else '#1f77b4' for p in processes]
bars = self.balance_ax.bar(names, ct_values, color=colors, edgecolor='black')
if bottleneck_ct > 0:
self.balance_ax.axhline(y=bottleneck_ct, color='red', linestyle='--', linewidth=2, label=f'瓶颈CT={bottleneck_ct:.2f}s')
if tt:
self.balance_ax.axhline(y=tt, color='green', linestyle=':', linewidth=2, label=f'TT={tt:.2f}s')
self.balance_ax.set_ylabel('时间 (秒)', fontsize=10)
self.balance_ax.set_title(f"快照: {snap_name} (平衡率={balance:.1f}%)", fontsize=11)
self.balance_ax.legend()
for bar, val in zip(bars, ct_values):
self.balance_ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.3, f'{val:.2f}',
ha='center', va='bottom', fontsize=8)
self.balance_figure.tight_layout()
self.balance_canvas.draw()
def compare_snapshot(self):
snap_name = self.snapshot_combo.get()
if not snap_name: return
product = self.balance_product_combo.get()
if not product: return
cur_processes = self.current_processes_sim
if not cur_processes:
cur_processes, _ = get_product_processes(product)
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("SELECT id FROM line_balance_snapshots WHERE snapshot_name=?", (snap_name,))
row = c.fetchone()
if not row: conn.close(); return
snap_id = row[0]
c.execute("SELECT seq, process_name, standard_ct_sec, measured_ct_sec FROM snapshot_processes WHERE snapshot_id=? ORDER BY seq", (snap_id,))
snap_processes = [{"seq": r[0], "name": r[1], "std_ct": r[2], "meas_ct": r[3]} for r in c.fetchall()]
conn.close()
self.balance_ax.clear()
all_seqs = sorted(set([p['seq'] for p in cur_processes] + [p['seq'] for p in snap_processes]))
cur_cts = [next((p["std_ct"] if p["std_ct"] is not None else p["meas_ct"]) for p in cur_processes if p['seq']==s) for s in all_seqs]
snap_cts = [next((p["std_ct"] if p["std_ct"] is not None else p["meas_ct"]) for p in snap_processes if p['seq']==s) for s in all_seqs]
names = [str(s) for s in all_seqs]
x = range(len(all_seqs))
width = 0.35
self.balance_ax.bar([i - width/2 for i in x], cur_cts, width, label='当前', color='#1f77b4', edgecolor='black')
self.balance_ax.bar([i + width/2 for i in x], snap_cts, width, label=f'快照:{snap_name}', color='#ff7f0e', edgecolor='black')
self.balance_ax.set_xticks(x)
self.balance_ax.set_xticklabels(names)
self.balance_ax.legend()
self.balance_ax.set_ylabel('时间 (秒)', fontsize=10)
self.balance_ax.set_title('线平衡对比', fontsize=11)
self.balance_figure.tight_layout()
self.balance_canvas.draw()
# ================== 报表导出 ==================
def build_report_tab(self):
f_btn = ttk.Frame(self.tab5)
f_btn.pack(fill=tk.X, padx=10, pady=10)
ttk.Button(f_btn, text="导出工序明细", command=self.export_process).pack(side=tk.LEFT, padx=5)
ttk.Button(f_btn, text="导出瓶颈分析", command=self.export_bottleneck).pack(side=tk.LEFT, padx=5)
ttk.Button(f_btn, text="导出产能日志", command=self.export_capacity_log).pack(side=tk.LEFT, padx=5)
f_query = ttk.LabelFrame(self.tab5, text="查询")
f_query.pack(fill=tk.X, padx=10, pady=5)
ttk.Label(f_query, text="品番关键词:").grid(row=0, column=0)
self.query_entry = ttk.Entry(f_query, width=20)
self.query_entry.grid(row=0, column=1)
ttk.Button(f_query, text="搜索", command=self.search_data).grid(row=0, column=2, padx=5)
self.query_tree = ttk.Treeview(f_query, columns=("prod","proc","std","meas"), show="headings", height=6)
self.query_tree.heading("prod", text="品番")
self.query_tree.heading("proc", text="工序")
self.query_tree.heading("std", text="标准CT")
self.query_tree.heading("meas", text="实测CT")
self.query_tree.grid(row=1, column=0, columnspan=3, sticky="ew", pady=5)
f_query.columnconfigure(0, weight=1)
def export_process(self):
path = filedialog.asksaveasfilename(defaultextension=".xlsx")
if not path: return
wb = Workbook()
ws = wb.active
ws.append(["产线","品番","序号","工序","标准CT","实测CT","备注","员工人数"])
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("""SELECT l.line_name, p.product_model, pp.seq, pp.process_name, pp.standard_ct_sec, pp.measured_ct_sec, pp.notes,
COALESCE(pm.worker_count,1)
FROM product_processes pp
JOIN products p ON pp.product_id=p.id
LEFT JOIN production_lines l ON p.line_id=l.id
LEFT JOIN process_manpower pm ON pp.id=pm.process_id
ORDER BY l.line_name, p.product_model, pp.seq""")
for r in c.fetchall():
ws.append(r)
conn.close()
wb.save(path)
messagebox.showinfo("完成", "已导出")
def export_bottleneck(self):
path = filedialog.asksaveasfilename(defaultextension=".xlsx")
if not path: return
wb = Workbook()
ws = wb.active
ws.append(["产线","品番","瓶颈工序","瓶颈CT","平衡率"])
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("SELECT l.line_name, p.product_model FROM products p LEFT JOIN production_lines l ON p.line_id=l.id")
for line_name, product in c.fetchall():
processes, _ = get_product_processes(product)
if processes:
binfo, bal, bct = calculate_bottleneck_and_balance(processes)
if binfo:
ws.append([line_name, product, binfo['name'], bct, bal])
conn.close()
wb.save(path)
messagebox.showinfo("完成", "已导出")
def export_capacity_log(self):
path = filedialog.asksaveasfilename(defaultextension=".xlsx")
if not path: return
wb = Workbook()
ws = wb.active
ws.append(["时间","产线","品番","班次","瓶颈CT","小时产能","班次产能"])
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("SELECT calc_time, line_id, product_model, shift_name, bottleneck_ct, hourly_cap, shift_cap FROM capacity_logs ORDER BY calc_time DESC")
for r in c.fetchall():
line_name = ""
if r[1]:
c2 = conn.cursor()
c2.execute("SELECT line_name FROM production_lines WHERE id=?", (r[1],))
lr = c2.fetchone()
if lr: line_name = lr[0]
ws.append([r[0], line_name, r[2], r[3], r[4], r[5], r[6]])
conn.close()
wb.save(path)
messagebox.showinfo("完成", "已导出")
def search_data(self):
kw = self.query_entry.get().strip()
for row in self.query_tree.get_children():
self.query_tree.delete(row)
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
if kw:
c.execute("""SELECT p.product_model, pp.process_name, pp.standard_ct_sec, pp.measured_ct_sec
FROM products p JOIN product_processes pp ON p.id=pp.product_id
WHERE p.product_model LIKE ? ORDER BY p.product_model, pp.seq""", ('%'+kw+'%',))
else:
c.execute("""SELECT p.product_model, pp.process_name, pp.standard_ct_sec, pp.measured_ct_sec
FROM products p JOIN product_processes pp ON p.id=pp.product_id ORDER BY p.product_model, pp.seq""")
for r in c.fetchall():
self.query_tree.insert("", tk.END, values=r)
conn.close()
if __name__ == "__main__":
init_db()
app = MainApplication()
app.mainloop()