好友
阅读权限10
听众
最后登录1970-1-1
|
结构如下:
就三个文档
执行Agu.py 就行了。
Agu.py
[Python] 纯文本查看 复制代码 # !/usr/bin/env python
# coding=utf-8
#执行python3 -s XXX.PY
from __future__ import (absolute_import, division, print_function, unicode_literals)
import os
import sys
import threading
import queue
from datetime import datetime
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import akshare as ak
import pandas as pd
from ZTReport import ZTDatabase, generate_html_report
# 日志队列用于跨线程通信
log_queue = queue.Queue()
class RedirectText:
"""重定向输出到Tkinter组件"""
def __init__(self, widget):
self.widget = widget
def write(self, text):
log_queue.put(text)
def flush(self):
pass
class StockZTApp:
def __init__(self, root):
self.root = root
root.title("涨停板数据采集系统 v2.0")
root.geometry("800x650")
# 创建主框架
main_frame = ttk.Frame(root, padding=10)
main_frame.pack(fill=tk.BOTH, expand=True)
# 路径选择组件
path_frame = ttk.Frame(main_frame)
path_frame.pack(fill=tk.X, pady=5)
ttk.Label(path_frame, text="保存目录:").pack(side=tk.LEFT)
self.path_var = tk.StringVar()
self.path_var.set(os.getcwd()) # 默认当前目录
ttk.Entry(path_frame, textvariable=self.path_var, width=50).pack(side=tk.LEFT, padx=5)
ttk.Button(path_frame, text="选择目录", command=self.select_directory).pack(side=tk.LEFT)
# 操作按钮
btn_frame = ttk.Frame(main_frame)
btn_frame.pack(fill=tk.X, pady=5)
self.start_button = ttk.Button(
btn_frame,
text="开始采集",
command=self.start_crawling
)
self.start_button.pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="查看数据库统计", command=self.show_db_stats).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="打开最新HTML报告", command=self.open_latest_html).pack(side=tk.LEFT, padx=5)
# 日志输出
log_frame = ttk.Frame(main_frame)
log_frame.pack(fill=tk.BOTH, expand=True)
self.log_text = tk.Text(log_frame, wrap=tk.WORD, state='disabled')
vsb = ttk.Scrollbar(log_frame, orient="vertical", command=self.log_text.yview)
self.log_text.configure(yscrollcommand=vsb.set)
vsb.pack(side=tk.RIGHT, fill=tk.Y)
self.log_text.pack(fill=tk.BOTH, expand=True)
# 初始化输出重定向
sys.stdout = RedirectText(self.log_text)
sys.stderr = RedirectText(self.log_text)
# 定时检查日志队列
self.root.after(100, self.update_log)
# 绑定采集完成事件
self.root.bind("<<CrawlComplete>>", self.on_crawl_complete)
def show_db_stats(self):
"""显示数据库统计信息"""
try:
db = ZTDatabase()
stats = db.get_stats()
dates = db.get_all_dates()
msg = (f"数据库路径:{db.db_path}\n\n"
f"数据天数:{stats['total_days']} 天\n"
f"总记录数:{stats['total_records']} 条\n"
f"日期范围:{stats['date_range']}\n\n"
f"已有日期:\n" + "\n".join(dates[:20]))
messagebox.showinfo("数据库统计", msg)
except Exception as e:
messagebox.showerror("错误", f"查询数据库失败:{e}")
def open_latest_html(self):
"""打开最新的HTML报告"""
import webbrowser
save_dir = self.path_var.get() or os.getcwd()
files = [f for f in os.listdir(save_dir) if f.startswith('涨停板报告') and f.endswith('.html')]
if not files:
messagebox.showwarning("提示", "未找到HTML报告文件,请先采集数据")
return
files.sort(reverse=True)
webbrowser.open(os.path.join(save_dir, files[0]))
def select_directory(self):
"""选择保存目录"""
path = filedialog.askdirectory()
if path:
self.path_var.set(path)
print(f"已设置保存路径:{path}")
def start_crawling(self):
"""启动采集线程"""
if not self.path_var.get():
messagebox.showwarning("警告", "请先选择保存目录!")
return
self.start_button.config(state=tk.DISABLED)
print("开始采集数据,请稍候...")
threading.Thread(
target=self.run_crawler,
args=(self.path_var.get(),),
daemon=True
).start()
def run_crawler(self, save_directory):
"""执行实际的采集任务"""
try:
stock_zt_pool(save_directory)
log_queue.put("\n采集完成!")
except Exception as e:
log_queue.put(f"\n发生错误:{str(e)}")
finally:
self.root.event_generate("<<CrawlComplete>>")
def on_crawl_complete(self, event):
"""采集完成后的回调"""
self.start_button.config(state=tk.NORMAL)
print("操作状态已重置")
def update_log(self):
"""更新日志显示"""
while not log_queue.empty():
msg = log_queue.get()
self.log_text.configure(state='normal')
self.log_text.insert(tk.END, msg)
self.log_text.see(tk.END)
self.log_text.configure(state='disabled')
self.root.after(100, self.update_log)
def stock_zt_pool(save_directory):
# 创建保存目录(如果不存在)
if not os.path.exists(save_directory):
os.makedirs(save_directory)
print(f"已创建目录:{save_directory}")
curr_date = datetime.now().strftime('%Y%m%d')
print(f"\n正在获取{curr_date}涨停板数据...")
try:
stock_zt_pool_em_df = ak.stock_zt_pool_em(date=curr_date)
except Exception as e:
print("获取数据失败:", e)
return
# 数据过滤
exclude_strings = ["ST", "退", "PT", "N", "C"]
pattern = '|'.join(exclude_strings)
mask = ~(
stock_zt_pool_em_df['代码'].str.startswith(("2", "4", "9")) |
stock_zt_pool_em_df['名称'].str.contains(pattern)
)
filtered_df = stock_zt_pool_em_df[mask].reset_index(drop=True)
filtered_df['主力净流入-净占比'] = 0.00
print(f"找到{len(filtered_df)}只有效涨停股票")
# 获取资金流数据
for index, row in filtered_df.iterrows():
code = row['代码']
name = row['名称']
print(f"正在处理 [{index + 1}/{len(filtered_df)}] {code}-{name}")
try:
market = "sh" if code.startswith("6") else "bj" if code.startswith("8") else "sz"
fund_flow = ak.stock_individual_fund_flow(stock=code, market=market)
filtered_df.at[index, '主力净流入-净占比'] = fund_flow['主力净流入-净占比'].iloc[-1]
except Exception as e:
print(f" 获取{code}资金流失败:{str(e)}")
continue
# 数据加工
filtered_df['封单占成交'] = round(filtered_df['封板资金'] / filtered_df['成交额'] * 100, 2)
filtered_df.insert(15, '换手率', filtered_df.pop('换手率'))
sorted_df = filtered_df.sort_values(by='主力净流入-净占比', ascending=False)
# 保存Excel文件
file_name = f"涨停板行情_{curr_date}.xlsx"
save_path = os.path.join(save_directory, file_name)
try:
sorted_df.to_excel(save_path, index=False)
print(f"\nExcel文件保存成功:{save_path}")
except Exception as e:
print(f"Excel文件保存失败:{str(e)}")
# 保存到数据库
try:
db = ZTDatabase()
db.save_daily_data(sorted_df, curr_date)
print(f"数据库保存成功")
except Exception as e:
print(f"数据库保存失败:{str(e)}")
# 生成HTML报告
try:
html_path = generate_html_report(sorted_df, curr_date, save_directory)
print(f"HTML报告生成成功:{html_path}")
except Exception as e:
print(f"HTML报告生成失败:{str(e)}")
if __name__ == '__main__':
root = tk.Tk()
app = StockZTApp(root)
root.mainloop()
ZTReport.py
[Python] 纯文本查看 复制代码 # !/usr/bin/env python
# coding=utf-8
"""
涨停板数据库存储 + HTML报告生成
"""
import os
import sqlite3
from datetime import datetime
import pandas as pd
# ==================== 数据库操作 ====================
class ZTDatabase:
"""涨停板数据SQLite数据库管理"""
def __init__(self, db_path=None):
if db_path is None:
db_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), '涨停板数据.db')
self.db_path = db_path
self._init_db()
def _init_db(self):
"""初始化数据库表"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS zt_daily (
id INTEGER PRIMARY KEY AUTOINCREMENT,
trade_date TEXT NOT NULL,
code TEXT NOT NULL,
name TEXT,
change_pct REAL,
price REAL,
turnover REAL,
float_mv REAL,
total_mv REAL,
seal_fund REAL,
first_seal_time TEXT,
last_seal_time TEXT,
break_count INTEGER,
zt_stat TEXT,
lianban INTEGER,
industry TEXT,
turnover_rate REAL,
fund_net_pct REAL,
seal_ratio REAL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(trade_date, code)
)
''')
conn.commit()
conn.close()
def save_daily_data(self, df, trade_date):
"""保存每日涨停数据"""
conn = sqlite3.connect(self.db_path)
column_map = {
'代码': 'code', '名称': 'name', '涨跌幅': 'change_pct',
'最新价': 'price', '成交额': 'turnover', '流通市值': 'float_mv',
'总市值': 'total_mv', '封板资金': 'seal_fund',
'首次封板时间': 'first_seal_time', '最后封板时间': 'last_seal_time',
'炸板次数': 'break_count', '涨停统计': 'zt_stat', '连板数': 'lianban',
'所属行业': 'industry', '换手率': 'turnover_rate',
'主力净流入-净占比': 'fund_net_pct', '封单占成交': 'seal_ratio',
}
save_df = df[list(column_map.keys())].rename(columns=column_map)
save_df['trade_date'] = trade_date
existing = pd.read_sql_query(
f"SELECT code FROM zt_daily WHERE trade_date='{trade_date}'", conn)
new_codes = set(save_df['code']) - set(existing['code'])
save_df = save_df[save_df['code'].isin(new_codes)]
if len(save_df) > 0:
save_df.to_sql('zt_daily', conn, if_exists='append', index=False)
print(f"数据库保存成功:{len(save_df)} 条新记录 (日期: {trade_date})")
else:
print(f"日期 {trade_date} 数据已存在,无需重复保存")
conn.close()
def load_daily_data(self, trade_date):
"""查询某日数据"""
conn = sqlite3.connect(self.db_path)
df = pd.read_sql_query(
f"SELECT * FROM zt_daily WHERE trade_date='{trade_date}' ORDER BY fund_net_pct DESC",
conn)
conn.close()
return df
def load_date_range(self, start_date, end_date):
"""查询日期范围内所有数据"""
conn = sqlite3.connect(self.db_path)
df = pd.read_sql_query(
f"SELECT * FROM zt_daily WHERE trade_date BETWEEN '{start_date}' AND '{end_date}' ORDER BY trade_date, fund_net_pct DESC",
conn)
conn.close()
return df
def get_all_dates(self):
"""获取所有已有数据的日期"""
conn = sqlite3.connect(self.db_path)
cursor = conn.execute("SELECT DISTINCT trade_date FROM zt_daily ORDER BY trade_date DESC")
dates = [row[0] for row in cursor.fetchall()]
conn.close()
return dates
def get_stats(self):
"""获取数据库统计信息"""
conn = sqlite3.connect(self.db_path)
cursor = conn.execute("SELECT COUNT(DISTINCT trade_date) FROM zt_daily")
total_days = cursor.fetchone()[0]
cursor = conn.execute("SELECT COUNT(*) FROM zt_daily")
total_records = cursor.fetchone()[0]
cursor = conn.execute("SELECT MIN(trade_date), MAX(trade_date) FROM zt_daily")
min_d, max_d = cursor.fetchone()
conn.close()
return {
'total_days': total_days,
'total_records': total_records,
'date_range': f"{min_d} ~ {max_d}" if min_d else "无数据"
}
# ==================== HTML报告生成 ====================
def _normalize_columns(df):
"""统一列名,兼容中文和英文列名"""
rename_map = {
'代码': 'code', '名称': 'name', '涨跌幅': 'change_pct',
'最新价': 'price', '成交额': 'turnover', '流通市值': 'float_mv',
'总市值': 'total_mv', '封板资金': 'seal_fund',
'首次封板时间': 'first_seal_time', '最后封板时间': 'last_seal_time',
'炸板次数': 'break_count', '涨停统计': 'zt_stat', '连板数': 'lianban',
'所属行业': 'industry', '换手率': 'turnover_rate',
'主力净流入-净占比': 'fund_net_pct', '封单占成交': 'seal_ratio',
}
result = df.copy()
for cn, en in rename_map.items():
if cn in result.columns and en not in result.columns:
result.rename(columns={cn: en}, inplace=True)
return result
def generate_html_report(df, trade_date, save_dir=None):
"""生成HTML格式分析报告"""
if df is None or len(df) == 0:
print("无数据可分析")
return None
if save_dir is None:
save_dir = os.path.dirname(os.path.abspath(__file__))
df = _normalize_columns(df)
total = len(df)
# ========== 行业分布 ==========
industry_count = df['industry'].value_counts()
industry_rows = ""
for industry, count in industry_count.items():
pct = count / total * 100
ind_df = df[df['industry'] == industry]
max_lianban = ind_df['lianban'].max()
bar_width = int(count / total * 100)
industry_rows += f"""
<tr>
<td>{industry}</td>
<td class="num">{count}</td>
<td class="num">{pct:.1f}%</td>
<td class="num">{max_lianban}板</td>
<td><div class="bar" style="width:{bar_width}%"></div></td>
</tr>"""
# ========== 连板分析 ==========
lianban_groups = df.sort_values('lianban', ascending=False).groupby('lianban')
lianban_detail = ""
for lb, group in sorted(lianban_groups, reverse=True):
if lb < 2:
continue
lianban_detail += f'<div class="lianban-section"><h4>{lb}连板 ({len(group)}只)</h4><table class="data-table"><tr><th>代码</th><th>名称</th><th>行业</th><th>炸板次数</th><th>封单占比</th><th>主力净流入</th></tr>'
for _, row in group.iterrows():
fund_class = "positive" if row['fund_net_pct'] > 0 else "negative"
lianban_detail += f"""<tr>
<td>{row['code']}</td><td>{row['name']}</td><td>{row['industry']}</td>
<td class="num">{row['break_count']}</td><td class="num">{row['seal_ratio']:.1f}%</td>
<td class="num {fund_class}">{row['fund_net_pct']:.2f}%</td>
</tr>"""
lianban_detail += "</table></div>"
# ========== 资金流向 TOP10 ==========
df_sorted = df.sort_values('fund_net_pct', ascending=False)
fund_top10 = ""
for _, row in df_sorted.head(10).iterrows():
fund_class = "positive" if row['fund_net_pct'] > 0 else "negative"
fund_top10 += f"""<tr>
<td>{row['code']}</td><td>{row['name']}</td><td>{row['industry']}</td>
<td class="num {fund_class}">{row['fund_net_pct']:.2f}%</td>
<td class="num">{row['seal_ratio']:.1f}%</td><td class="num">{row['lianban']}板</td>
</tr>"""
# ========== 封板力度 TOP10 ==========
df_seal = df.sort_values('seal_ratio', ascending=False)
seal_top10 = ""
for _, row in df_seal.head(10).iterrows():
seal_top10 += f"""<tr>
<td>{row['code']}</td><td>{row['name']}</td><td>{row['industry']}</td>
<td class="num">{row['seal_ratio']:.1f}%</td>
<td class="num">{row['break_count']}次</td><td class="num">{row['lianban']}板</td>
</tr>"""
# ========== 综合评分 TOP10 ==========
df['score'] = df.apply(
lambda r: (r['fund_net_pct'] + 1) * r['seal_ratio'] / max(r['break_count'], 0.1), axis=1)
df_quality = df.sort_values('score', ascending=False)
quality_top10 = ""
for _, row in df_quality.head(10).iterrows():
quality_top10 += f"""<tr>
<td>{row['code']}</td><td>{row['name']}</td><td>{row['industry']}</td>
<td class="num">{row['fund_net_pct']:.2f}%</td>
<td class="num">{row['seal_ratio']:.1f}%</td>
<td class="num">{row['break_count']}次</td>
<td class="num score">{row['score']:.1f}</td>
</tr>"""
# ========== 行业分布图 (简单文字柱图) ==========
chart_bars = ""
max_count = industry_count.iloc[0] if len(industry_count) > 0 else 1
for industry, count in industry_count.head(10).items():
width = int(count / max_count * 100)
chart_bars += f"""<div class="chart-row">
<span class="chart-label">{industry}</span>
<div class="chart-bar" style="width:{width}%">{count}</div>
</div>"""
# ========== 连板统计柱图 ==========
lianban_count = df['lianban'].value_counts().sort_index(ascending=False)
lianban_bars = ""
lb_max = lianban_count.iloc[0] if len(lianban_count) > 0 else 1
for lb, count in lianban_count.items():
width = int(count / lb_max * 100)
lianban_bars += f"""<div class="chart-row">
<span class="chart-label">{lb}连板</span>
<div class="chart-bar lb-bar" style="width:{width}%">{count}</div>
</div>"""
# ========== 生成完整HTML ==========
html = f"""<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>涨停板分析报告 - {trade_date}</title>
<style>
* {{ margin: 0; padding: 0; box-sizing: border-box; }}
body {{ font-family: 'Microsoft YaHei', sans-serif; background: #f0f2f5; color: #333; }}
.container {{ max-width: 1000px; margin: 0 auto; padding: 20px; }}
.header {{ background: linear-gradient(135deg, #e74c3c, #c0392b); color: white;
padding: 30px; border-radius: 12px; margin-bottom: 20px; text-align: center; }}
.header h1 {{ font-size: 28px; margin-bottom: 8px; }}
.header p {{ font-size: 16px; opacity: 0.9; }}
.stats-bar {{ display: flex; justify-content: space-around; background: white;
padding: 20px; border-radius: 10px; margin-bottom: 20px;
box-shadow: 0 2px 8px rgba(0,0,0,0.08); }}
.stat-item {{ text-align: center; }}
.stat-num {{ font-size: 28px; font-weight: bold; color: #e74c3c; }}
.stat-label {{ font-size: 13px; color: #888; margin-top: 4px; }}
.card {{ background: white; border-radius: 10px; padding: 24px;
margin-bottom: 20px; box-shadow: 0 2px 8px rgba(0,0,0,0.08); }}
.card h2 {{ font-size: 20px; margin-bottom: 16px; padding-bottom: 10px;
border-bottom: 2px solid #e74c3c; color: #2c3e50; }}
.card h4 {{ font-size: 15px; margin: 12px 0 8px; color: #555; }}
.data-table {{ width: 100%; border-collapse: collapse; font-size: 13px; }}
.data-table th {{ background: #f8f9fa; padding: 10px 8px; text-align: left;
border-bottom: 2px solid #dee2e6; font-weight: 600; }}
.data-table td {{ padding: 8px; border-bottom: 1px solid #eee; }}
.data-table tr:hover {{ background: #f8f9fa; }}
.num {{ text-align: right; font-family: monospace; }}
.positive {{ color: #e74c3c; font-weight: bold; }}
.negative {{ color: #27ae60; }}
.score {{ color: #e67e22; font-weight: bold; }}
.bar {{ height: 18px; background: linear-gradient(90deg, #e74c3c, #ff6b6b);
border-radius: 4px; min-width: 2px; }}
.chart-row {{ display: flex; align-items: center; margin: 6px 0; }}
.chart-label {{ width: 90px; text-align: right; padding-right: 12px;
font-size: 13px; color: #555; flex-shrink: 0; }}
.chart-bar {{ height: 28px; background: linear-gradient(90deg, #e74c3c, #ff6b6b);
border-radius: 6px; color: white; padding: 0 10px;
font-size: 13px; line-height: 28px; min-width: 30px; text-align: center; }}
.lb-bar {{ background: linear-gradient(90deg, #3498db, #5dade2); }}
.lianban-section {{ margin-bottom: 10px; }}
.two-col {{ display: flex; gap: 20px; }}
.two-col > div {{ flex: 1; }}
.footer {{ text-align: center; color: #aaa; font-size: 12px; padding: 20px; }}
[url=home.php?mod=space&uid=945662]@media[/url] (max-width: 768px) {{
.two-col {{ flex-direction: column; }}
.stats-bar {{ flex-direction: column; gap: 12px; }}
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>涨停板分析报告</h1>
<p>{trade_date[:4]}年{trade_date[4:6]}月{trade_date[6:]}日 | A股涨停复盘</p>
</div>
<div class="stats-bar">
<div class="stat-item">
<div class="stat-num">{total}</div>
<div class="stat-label">涨停股数</div>
</div>
<div class="stat-item">
<div class="stat-num">{len(industry_count)}</div>
<div class="stat-label">涉及行业</div>
</div>
<div class="stat-item">
<div class="stat-num">{len(df[df['lianban'] >= 2])}</div>
<div class="stat-label">2连板以上</div>
</div>
<div class="stat-item">
<div class="stat-num">{len(df[df['fund_net_pct'] > 0])}</div>
<div class="stat-label">主力净流入</div>
</div>
</div>
<!-- 行业分布 -->
<div class="card">
<h2>一、行业分布</h2>
<div class="two-col">
<div>
<h4>行业涨停排行</h4>
{chart_bars}
</div>
<div>
<table class="data-table">
<tr><th>行业</th><th>数量</th><th>占比</th><th>最高连板</th><th>分布</th></tr>
{industry_rows}
</table>
</div>
</div>
</div>
<!-- 连板分析 -->
<div class="card">
<h2>二、连板分析</h2>
<div class="two-col">
<div>
<h4>连板梯队分布</h4>
{lianban_bars}
</div>
<div>
<h4>连板股票明细</h4>
{lianban_detail if lianban_detail else '<p style="color:#999">无2连板以上股票</p>'}
</div>
</div>
</div>
<!-- 资金流向 -->
<div class="card">
<h2>三、资金流向分析</h2>
<div class="two-col">
<div>
<h4>主力净流入 TOP10</h4>
<table class="data-table">
<tr><th>代码</th><th>名称</th><th>行业</th><th>净流入占比</th><th>封单占比</th><th>连板</th></tr>
{fund_top10}
</table>
</div>
<div>
<h4>封板力度 TOP10</h4>
<table class="data-table">
<tr><th>代码</th><th>名称</th><th>行业</th><th>封单占比</th><th>炸板</th><th>连板</th></tr>
{seal_top10}
</table>
</div>
</div>
</div>
<!-- 综合评分 -->
<div class="card">
<h2>四、综合质量评分 TOP10</h2>
<p style="color:#888;font-size:13px;margin-bottom:12px;">
评分 = (主力净流入占比 + 1) × 封单占比 ÷ 炸板次数 |
评分越高 = 封板力度强 + 资金流入 + 封板稳定
</p>
<table class="data-table">
<tr><th>代码</th><th>名称</th><th>行业</th><th>主力净流入</th><th>封单占比</th><th>炸板</th><th>综合评分</th></tr>
{quality_top10}
</table>
</div>
<div class="footer">
<p>涨停板分析报告 - 仅供参考,不构成投资建议</p>
</div>
</div>
</body>
</html>"""
file_name = f"涨停板报告_{trade_date}.html"
file_path = os.path.join(save_dir, file_name)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(html)
print(f"HTML报告已生成:{file_path}")
return file_path
AnalyzeZT.py
[Python] 纯文本查看 复制代码 # !/usr/bin/env python
# coding=utf-8
"""
涨停板数据分析:行业分布 + 连板分析 + 资金流向
"""
import os
import sys
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import pandas as pd
class AnalyzeZTApp:
def __init__(self, root):
self.root = root
root.title("涨停板数据分析 v1.0")
root.geometry("850x700")
main_frame = ttk.Frame(root, padding=10)
main_frame.pack(fill=tk.BOTH, expand=True)
# 文件选择
file_frame = ttk.Frame(main_frame)
file_frame.pack(fill=tk.X, pady=5)
ttk.Label(file_frame, text="数据文件:").pack(side=tk.LEFT)
self.file_var = tk.StringVar()
ttk.Entry(file_frame, textvariable=self.file_var, width=50).pack(side=tk.LEFT, padx=5)
ttk.Button(file_frame, text="选择文件", command=self.select_file).pack(side=tk.LEFT)
ttk.Button(file_frame, text="自动查找最新", command=self.auto_find).pack(side=tk.LEFT, padx=5)
# 分析按钮
btn_frame = ttk.Frame(main_frame)
btn_frame.pack(fill=tk.X, pady=5)
ttk.Button(btn_frame, text="1. 行业分布分析", command=self.analyze_industry).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="2. 连板分析", command=self.analyze_lianban).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="3. 资金流向分析", command=self.analyze_fund).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="全部分析", command=self.analyze_all).pack(side=tk.LEFT, padx=5)
# 结果输出
result_frame = ttk.Frame(main_frame)
result_frame.pack(fill=tk.BOTH, expand=True)
self.result_text = tk.Text(result_frame, wrap=tk.WORD, state='disabled', font=('Consolas', 10))
vsb = ttk.Scrollbar(result_frame, orient="vertical", command=self.result_text.yview)
self.result_text.configure(yscrollcommand=vsb.set)
vsb.pack(side=tk.RIGHT, fill=tk.Y)
self.result_text.pack(fill=tk.BOTH, expand=True)
self.df = None
def select_file(self):
path = filedialog.askopenfilename(
filetypes=[("Excel files", "*.xlsx *.xls"), ("All files", "*.*")])
if path:
self.file_var.set(path)
self.load_data(path)
def auto_find(self):
"""自动查找当前目录下最新的涨停板文件"""
files = [f for f in os.listdir('.') if f.startswith('涨停板行情') and f.endswith('.xlsx')]
if not files:
messagebox.showwarning("提示", "当前目录下未找到涨停板行情文件")
return
files.sort(reverse=True)
self.file_var.set(os.path.abspath(files[0]))
self.load_data(files[0])
def load_data(self, path):
try:
self.df = pd.read_excel(path)
self.show_result(f"已加载数据:{path}\n共 {len(self.df)} 只涨停股票\n")
except Exception as e:
messagebox.showerror("错误", f"读取文件失败:{e}")
def show_result(self, text):
self.result_text.configure(state='normal')
self.result_text.insert(tk.END, text)
self.result_text.see(tk.END)
self.result_text.configure(state='disabled')
def clear_result(self):
self.result_text.configure(state='normal')
self.result_text.delete('1.0', tk.END)
self.result_text.configure(state='disabled')
def check_data(self):
if self.df is None:
messagebox.showwarning("提示", "请先加载数据文件")
return False
return True
def analyze_industry(self):
"""行业分布分析"""
self.clear_result()
if not self.check_data():
return
df = self.df
self.show_result("=" * 60 + "\n")
self.show_result(" 【一、行业分布分析】\n")
self.show_result("=" * 60 + "\n\n")
# 行业涨停数统计
industry_count = df['所属行业'].value_counts()
total = len(df)
self.show_result(f"📊 涨停股总数:{total} 只\n")
self.show_result(f"📊 涉及行业数:{len(industry_count)} 个\n\n")
self.show_result("-" * 55 + "\n")
self.show_result(f"{'行业':<12} {'涨停数':>6} {'占比':>8} {'连板数':>6} {'图形':<10}\n")
self.show_result("-" * 55 + "\n")
for industry, count in industry_count.items():
pct = count / total * 100
industry_df = df[df['所属行业'] == industry]
lianban = industry_df['连板数'].max()
bar = "█" * count
self.show_result(f"{industry:<12} {count:>6}只 {pct:>6.1f}% {lianban:>6}板 {bar}\n")
self.show_result("\n" + "-" * 55 + "\n\n")
# 结论
top3 = industry_count.head(3)
self.show_result("💡 分析结论:\n\n")
self.show_result(f" 当天涨停最集中的行业(TOP3):\n")
for i, (ind, cnt) in enumerate(top3.items(), 1):
self.show_result(f" {i}. {ind}:{cnt}只涨停\n")
if top3.iloc[0] >= 3:
self.show_result(f"\n 🔥 {top3.index[0]} 是当天最强主线方向,\n")
self.show_result(f" 资金高度集中,值得重点关注!\n")
else:
self.show_result(f"\n 各行业涨停分散,未形成明显主线。\n")
self.show_result(f" 需要结合后续走势确认持续性。\n")
def analyze_lianban(self):
"""连板分析"""
self.clear_result()
if not self.check_data():
return
df = self.df
self.show_result("=" * 60 + "\n")
self.show_result(" 【二、连板分析】\n")
self.show_result("=" * 60 + "\n\n")
# 连板数统计
lianban_count = df['连板数'].value_counts().sort_index(ascending=False)
self.show_result(f"📊 涨停股总数:{len(df)} 只\n\n")
self.show_result("-" * 55 + "\n")
self.show_result(f"{'连板数':>6} {'股票数':>6} {'占比':>8} {'图形':<20}\n")
self.show_result("-" * 55 + "\n")
for lianban, count in lianban_count.items():
pct = count / len(df) * 100
bar = "■" * count
self.show_result(f"{lianban:>6}板 {count:>6}只 {pct:>6.1f}% {bar}\n")
self.show_result("\n" + "-" * 55 + "\n\n")
# 各连板股票明细
self.show_result("📋 各连板股票明细:\n\n")
for lianban in sorted(df['连板数'].unique(), reverse=True):
if lianban < 2:
continue
stocks = df[df['连板数'] == lianban]
self.show_result(f" 【{lianban}连板】共{len(stocks)}只:\n")
for _, row in stocks.iterrows():
self.show_result(f" {row['代码']} {row['名称']:<8} "
f"行业:{row['所属行业']:<10} "
f"炸板:{row['炸板次数']}次 "
f"封单占比:{row['封单占成交']:.1f}%\n")
self.show_result("\n")
# 结论
lianban_2plus = len(df[df['连板数'] >= 2])
lianban_3plus = len(df[df['连板数'] >= 3])
self.show_result("💡 分析结论:\n\n")
self.show_result(f" 2连板以上:{lianban_2plus}只\n")
self.show_result(f" 3连板以上:{lianban_3plus}只\n\n")
if lianban_3plus >= 3:
self.show_result(" 🔥 高位连板多,市场情绪亢奋!\n")
self.show_result(" 追涨资金活跃,短线机会较多。\n")
elif lianban_2plus >= 5:
self.show_result(" 📈 连板梯队健康,市场有一定赚钱效应。\n")
self.show_result(" 可适当参与强势股的回封机会。\n")
else:
self.show_result(" ⚠️ 连板数量偏少,市场赚钱效应一般。\n")
self.show_result(" 建议以首板为主,谨慎追高。\n")
def analyze_fund(self):
"""资金流向分析"""
self.clear_result()
if not self.check_data():
return
df = self.df.copy()
self.show_result("=" * 60 + "\n")
self.show_result(" 【三、资金流向分析】\n")
self.show_result("=" * 60 + "\n\n")
df_sorted = df.sort_values(by='主力净流入-净占比', ascending=False)
# 主力净流入TOP10
self.show_result("📈 主力净流入占比 TOP10:\n\n")
self.show_result("-" * 60 + "\n")
self.show_result(f"{'代码':<8} {'名称':<10} {'行业':<10} {'净占比':>8} {'封单占比':>8} {'连板':>4}\n")
self.show_result("-" * 60 + "\n")
for _, row in df_sorted.head(10).iterrows():
self.show_result(f"{row['代码']:<8} {row['名称']:<10} {row['所属行业']:<10} "
f"{row['主力净流入-净占比']:>7.2f}% "
f"{row['封单占成交']:>7.1f}% "
f"{row['连板数']:>4}板\n")
self.show_result("\n" + "-" * 60 + "\n\n")
# 封单占比TOP10
df_seal = df.sort_values(by='封单占成交', ascending=False)
self.show_result("🔒 封单占比 TOP10(封板力度):\n\n")
self.show_result("-" * 60 + "\n")
self.show_result(f"{'代码':<8} {'名称':<10} {'行业':<10} {'封单占比':>8} {'炸板':>4} {'连板':>4}\n")
self.show_result("-" * 60 + "\n")
for _, row in df_seal.head(10).iterrows():
self.show_result(f"{row['代码']:<8} {row['名称']:<10} {row['所属行业']:<10} "
f"{row['封单占成交']:>7.1f}% "
f"{row['炸板次数']:>4}次 "
f"{row['连板数']:>4}板\n")
self.show_result("\n" + "-" * 60 + "\n\n")
# 综合质量评分
self.show_result("🏆 综合质量评分(净占比 x 封单占比 / 炸板次数):\n\n")
df['综合评分'] = df.apply(
lambda r: (r['主力净流入-净占比'] + 1) * r['封单占成交'] / max(r['炸板次数'], 0.1),
axis=1
)
df_quality = df.sort_values(by='综合评分', ascending=False)
self.show_result("-" * 65 + "\n")
self.show_result(f"{'代码':<8} {'名称':<10} {'行业':<10} {'净占比':>7} {'封单占比':>8} {'炸板':>4} {'评分':>8}\n")
self.show_result("-" * 65 + "\n")
for _, row in df_quality.head(10).iterrows():
self.show_result(f"{row['代码']:<8} {row['名称']:<10} {row['所属行业']:<10} "
f"{row['主力净流入-净占比']:>6.2f}% "
f"{row['封单占成交']:>7.1f}% "
f"{row['炸板次数']:>4}次 "
f"{row['综合评分']:>7.1f}\n")
self.show_result("\n" + "-" * 65 + "\n\n")
# 结论
positive_fund = len(df[df['主力净流入-净占比'] > 0])
self.show_result("💡 分析结论:\n\n")
self.show_result(f" 主力净流入为正的股票:{positive_fund}/{len(df)} 只\n")
top_quality = df_quality.head(3)
self.show_result(f"\n 🏆 综合质量最高的3只股票:\n")
for i, (_, row) in enumerate(top_quality.iterrows(), 1):
self.show_result(f" {i}. {row['代码']} {row['名称']} "
f"({row['所属行业']}) 评分:{row['综合评分']:.1f}\n")
self.show_result(f"\n 综合评分高的股票 = 封板力度强 + 资金流入 + 封板稳定\n")
self.show_result(f" 这些股票第二天溢价概率相对较高(仅供参考,非投资建议)\n")
def analyze_all(self):
self.clear_result()
if not self.check_data():
return
self.analyze_industry()
self.show_result("\n\n")
self.analyze_lianban()
self.show_result("\n\n")
self.analyze_fund()
if __name__ == '__main__':
root = tk.Tk()
app = AnalyzeZTApp(root)
root.mainloop()
|
免费评分
-
查看全部评分
|