[Asm] 纯文本查看 复制代码
import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext
import akshare as ak
import pandas as pd
import datetime
import os
from docx import Document
from openai import OpenAI
import json
# 设置OpenAI API配置
client = OpenAI(
base_url="https://api.deepseek.com/v1",
api_key="sk-00000000000"
)
def load_stock_codes():
if os.path.exists('mystock.txt'):
with open('mystock.txt', 'r') as f:
return f.read().strip().split('\n')
return []
def save_stock_codes(codes):
with open('mystock.txt', 'w') as f:
f.write('\n'.join(codes))
def load_analysis_history():
if os.path.exists('analysis_history.csv'):
return pd.read_csv('analysis_history.csv')
return pd.DataFrame(columns=['股票代码', '股票名称', '分析日期'])
def save_analysis_history(data):
df = pd.DataFrame(data)
df.to_csv('analysis_history.csv', index=False)
def prepare_stock_data(df, code):
# 获取股票名称
stock_info = ak.stock_info_a_code_name()
stock_name = stock_info[stock_info['code'] == code]['name'].iloc[0] if not stock_info[
stock_info['code'] == code].empty else '未知'
# 计算技术指标
ma5 = df['收盘'].rolling(5).mean()
ma10 = df['收盘'].rolling(10).mean()
ma20 = df['收盘'].rolling(20).mean()
ma60 = df['收盘'].rolling(60).mean()
# 计算RSI
delta = df['收盘'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
# 计算MACD
exp1 = df['收盘'].ewm(span=12, adjust=False).mean()
exp2 = df['收盘'].ewm(span=26, adjust=False).mean()
macd = exp1 - exp2
signal = macd.ewm(span=9, adjust=False).mean()
# 计算布林带
middle_band = df['收盘'].rolling(20).mean()
std = df['收盘'].rolling(20).std()
upper_band = middle_band + (std * 2)
lower_band = middle_band - (std * 2)
# 准备分析数据,将所有数值转换为Python原生类型
analysis_data = {
'股票代码': code,
'股票名称': stock_name,
'最新收盘价': float(df['收盘'].iloc[-1]),
'近5日涨跌幅': f"{df['收盘'].pct_change().iloc[-5:].mean() * 100:.2f}%",
'成交量': int(df['成交量'].iloc[-1]),
'成交额': float(df['成交额'].iloc[-1]),
'换手率': float(df['换手率'].iloc[-1]),
'技术指标': {
'MA5': float(ma5.iloc[-1]),
'MA10': float(ma10.iloc[-1]),
'MA20': float(ma20.iloc[-1]),
'MA60': float(ma60.iloc[-1]),
'RSI': float(rsi.iloc[-1]),
'MACD': float(macd.iloc[-1]),
'MACD信号线': float(signal.iloc[-1]),
'布林带上轨': float(upper_band.iloc[-1]),
'布林带中轨': float(middle_band.iloc[-1]),
'布林带下轨': float(lower_band.iloc[-1])
}
}
return analysis_data
def analyze_stocks(codes, history_callback=None):
today = datetime.datetime.now()
today_str = today.strftime('%Y%m%d')
time_str = today.strftime('%H%M')
# 加载历史记录
history = load_analysis_history().to_dict('records')
# 用于保存新的分析记录
new_records = []
for code in codes:
try:
df = ak.stock_zh_a_hist(symbol=code, period="daily", start_date="20240101", adjust="qfq")
if df is not None and len(df) > 0:
# 准备分析数据
analysis_data = prepare_stock_data(df, code)
stock_name = analysis_data["股票名称"]
# 构建提示词
prompt = f"""请分析以下股票数据,给出详细的分析报告和投资建议:
{json.dumps(analysis_data, ensure_ascii=False, indent=2)}
请从以下几个方面进行分析:
1. 基本面分析
2. 技术面分析
3. 市场情绪分析
4. 风险提示
5. 投资建议
6. 预测下一个交易日涨幅
请用专业、客观的语言进行分析,并给出具体的操作建议。"""
# 调用deepseek-chat模型
response = client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system",
"content": "你是一位专业的股票分析师,请对股票数据进行深入分析并提供投资建议。"},
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=2000
)
# 获取分析结果
analysis_result = response.choices[0].message.content
# 保存为Word文档
doc = Document()
doc.add_heading(f'股票分析报告', 0)
doc.add_heading(f'股票代码: {code} ({stock_name})', level=1)
doc.add_paragraph(analysis_result)
# 使用指定格式保存文件
file_name = f"{code}_{stock_name}_{today_str}{time_str}.docx"
doc.save(file_name)
# 记录分析历史
record = {
'股票代码': code,
'股票名称': stock_name,
'分析日期': today.strftime('%Y-%m-%d %H:%M:%S'),
'文件路径': file_name
}
new_records.append(record)
except Exception as e:
print(f"分析股票 {code} 时出错: {str(e)}")
# 添加新记录到历史
history = new_records + history # 新记录在前
# 保存历史记录
save_analysis_history(history)
# 更新历史表格
if history_callback:
history_callback(history)
if new_records:
messagebox.showinfo("分析完成", f"分析结果已保存为 {new_records[0]['文件路径']} 等文件")
else:
messagebox.showerror("分析失败", "未能成功分析任何股票")
class StockAnalysisApp:
def __init__(self, root):
self.root = root
self.root.title("股票分析程序")
self.root.geometry("800x600")
# 加载历史记录
self.history = load_analysis_history().to_dict('records')
# 创建UI
self.create_widgets()
# 初始化历史表格
self.update_history_table()
def create_widgets(self):
# 创建主框架
main_frame = ttk.Frame(self.root, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# 股票代码输入区域
ttk.Label(main_frame, text="分析股票代码:").grid(row=0, column=0, sticky=tk.W, pady=5)
self.stock_text = scrolledtext.ScrolledText(main_frame, height=3, width=50)
self.stock_text.grid(row=1, column=0, sticky=(tk.W, tk.E), pady=5)
# 分析按钮
self.analyze_button = ttk.Button(main_frame, text="分析股票", command=self.analyze)
self.analyze_button.grid(row=1, column=1, padx=10, pady=5)
# 历史记录表格
ttk.Label(main_frame, text="分析历史记录:").grid(row=2, column=0, sticky=tk.W, pady=5)
columns = ("股票代码", "股票名称", "分析日期")
self.history_tree = ttk.Treeview(main_frame, columns=columns, show="headings", height=10)
for col in columns:
self.history_tree.heading(col, text=col)
# 设置列宽
width = 150 if col != "分析日期" else 200
self.history_tree.column(col, width=width, anchor=tk.CENTER)
self.history_tree.grid(row=3, column=0, columnspan=2, sticky=(tk.W, tk.E, tk.N, tk.S), pady=5)
# 添加垂直滚动条
scrollbar = ttk.Scrollbar(main_frame, orient=tk.VERTICAL, command=self.history_tree.yview)
self.history_tree.configure(yscroll=scrollbar.set)
scrollbar.grid(row=3, column=2, sticky=(tk.N, tk.S))
# 绑定点击事件
self.history_tree.bind("<<TreeviewSelect>>", self.on_history_select)
# 设置网格权重,使表格可以扩展
main_frame.grid_columnconfigure(0, weight=1)
main_frame.grid_rowconfigure(3, weight=1)
def update_history_table(self):
# 清空表格
for item in self.history_tree.get_children():
self.history_tree.delete(item)
# 填充表格(按日期倒序)
for record in self.history:
self.history_tree.insert("", tk.END, values=(
record['股票代码'],
record['股票名称'],
record['分析日期']
))
def on_history_select(self, event):
# 获取选中的项
selected_item = self.history_tree.selection()
if not selected_item:
return
# 获取股票代码
item = self.history_tree.item(selected_item[0])
stock_code = item['values'][0] # 股票代码在第一列
# 填充到文本框
self.stock_text.delete(1.0, tk.END)
self.stock_text.insert(tk.END, stock_code)
def analyze(self):
codes = self.stock_text.get(1.0, tk.END).strip().split('\n')
codes = [code.strip() for code in codes if code.strip()]
if not codes:
messagebox.showerror("错误", "请先输入股票代码!")
return
# 分析股票并更新历史记录
analyze_stocks(codes, self.update_history_table_callback)
def update_history_table_callback(self, history):
self.history = history
self.update_history_table()
if __name__ == "__main__":
root = tk.Tk()
app = StockAnalysisApp(root)
root.mainloop()