好友
阅读权限20
听众
最后登录1970-1-1
|
【Python实战】自己动手做一个 MySQL 数据库实时监控系统,附部分源码
前言
本文分享一个我在实际工作中开发并长期使用的运维小工具——不动产共享监控系统。项目用 Python + Flask 写的,适合有数据库运维需求的同学,也适合想学 Flask 全栈的新人练手。代码量不大,逻辑清晰,直接上效果图。
一、先看效果
1.1 监控大屏(首页)
整体采用浅色主题,蓝色点缀,布局如下:
- 左侧:实例导航树,可按数据库实例 → 数据库 → 表三级展开筛选
- 顶部卡片:实时展示 监控任务总数 / 今日成功次数 / 今日异常次数 / 最近同步时间
- 中部柱状图:今日各表新增记录数,自动按数据库分组着色,支持自定义排序
- 中部折线图:支持选择任意监控任务,查看近7天的记录数趋势
- 底部表格:今日所有监控结果明细,含执行状态、记录数、执行时间等字段
1.2 后台管理界面
后台分为以下几个管理模块:
| Tab |
功能 |
| 监控任务 |
创建/编辑/发布/删除监控任务 |
| 数据库连接 |
管理 MySQL 实例连接信息 |
| 数据库别名 |
为数据库/实例设置展示别名 |
| SQL 配置 |
配置自定义查询 SQL |
| 图表排序 |
拖拽调整柱状图各表显示顺序 |
二、主要功能介绍
2.1 多实例、多数据库监控
系统支持同时对多个 MySQL 实例进行监控,每个实例下可以监控多个数据库、多张表。连接信息集中在后台管理,无需改代码。
连接信息字段包括:实例名称 / 别名 / 主机 / 端口 / 用户名 / 密码 / 默认数据库,添加后可立即测试连通性。
2.2 灵活的监控任务配置
每个监控任务绑定一张表,支持以下配置:
- 监控频率:5分钟 / 15分钟 / 30分钟 / 1小时 / 每天
- 查询模式:系统自动生成
COUNT(*) 查询,也支持自定义 SQL(比如带 WHERE 条件的统计)
- 表别名:给业务表取一个友好的展示名称,在大屏上显示
- 发布控制:任务创建后默认不发布,发布后才会出现在监控大屏
同一张表可以创建多个不同条件的监控任务,互不干扰。
2.3 今日数据可视化
柱状图支持按数据库自动分组着色,8种颜色循环分配,相同数据库下的表使用同色系,视觉上一眼能区分数据来源。
- 异常任务(status=error)显示为红色
- 今日未运行的任务(pending)显示为浅灰色
- 图表下方附有颜色图例,标注每种颜色对应的数据库名
折线图支持从下拉框选择任意已发布任务,查看近7天该表的记录数历史走势,帮助判断数据流入是否正常。
2.4 图表排序管理
在后台管理的「图表排序」Tab 中,可以调整柱状图各表的显示顺序:
- 上移 / 下移:逐步微调位置
- 置顶:一键把某张表移到第一位
- 保存后前台立即生效
2.5 最近同步状态
首页顶部卡片「最近同步」会实时显示距上次成功同步的时间差,格式如下:
刚刚 — 1分钟内
3分钟前 — 3-59分钟
2小时前 — 1-23小时
1天前 — 超过1天
2.6 自动定时调度
系统基于 APScheduler 实现定时任务调度,所有已发布任务按配置的频率自动执行,无需手动触发。执行结果(成功/失败/记录数/错误信息)自动写入数据库,历史记录完整保留。
三、技术栈
| 层次 |
技术 |
| 后端框架 |
Python 3.12 + Flask 3.x |
| 任务调度 |
Flask-APScheduler |
| 数据存储 |
SQLite3(本地文件,零依赖) |
| 目标数据库 |
MySQL / MariaDB(通过 PyMySQL 连接) |
| 前端图表 |
ECharts 5.4.3 |
| 前端样式 |
原生 CSS 变量 + 自研组件,无前端框架依赖 |
| 认证 |
Session + SHA256 密码哈希 |
整个项目无需前端构建工具,模板直接用 Jinja2 渲染,部署只需要一个 python app.py,非常适合内网环境。
四、项目结构
db_monitor/
├── app.py # 主程序(路由 + API + 调度器)
├── monitor.db # SQLite 数据库(自动创建)
├── requirements.txt # 依赖列表
└── templates/
├── base.html # 公共布局模板(侧边栏 + 头部 + CSS变量)
├── login.html # 登录页
├── index.html # 监控大屏(首页)
└── admin.html # 后台管理界面
代码量:app.py 约 1400 行,4个模板文件合计约 3000 行,逻辑清晰分层。
五、数据库设计
系统使用 SQLite,共 5 张表:
-- 用户表
users (id, username, password, created_at)
-- MySQL 实例连接配置
db_connections (id, name, alias, host, port, username, password, database_name, status)
-- 数据库别名(实例 → 数据库 两级别名)
db_aliases (id, connection_id, db_name, alias)
-- 监控任务
monitor_tasks (id, name, db_connection_id, database_name, table_name,
table_alias, check_condition, sql_query, frequency,
status, is_published, sort_order, last_run_at, last_result)
-- 监控执行结果(历史记录)
monitor_results (id, task_id, record_count, status, error_message, executed_at)
数据库会在首次启动时自动创建,并自动执行字段迁移(ALTER TABLE ADD COLUMN IF NOT EXISTS),升级无需手动操作。
六、部署方法
6.1 环境要求
flask
flask-apscheduler
pymysql
一键安装:
pip install -r requirements.txt
6.2 启动服务
cd db_monitor
python app.py
默认监听 0.0.0.0:5000,浏览器访问 http://localhost:5000 即可。
默认账号密码:admin / admin123(进后台后请及时修改)
6.3 首次配置步骤
- 登录后台 → 数据库连接 → 新增一个 MySQL 实例,测试连通性
- 进入 监控任务 → 新建任务,选择目标表和监控频率
- 任务默认为「未发布」状态,点击发布后才会出现在监控大屏
- 可在 数据库别名 为实例和数据库设置展示友好的中文名称
- 在 图表排序 中调整柱状图各表显示顺序
七、几个细节说明
为什么用 SQLite 而不是 MySQL?
监控系统本身的数据量极小(任务配置 + 结果记录),SQLite 文件数据库完全够用,并且:
- 无需额外安装数据库服务
- 数据文件可以直接备份复制
- 内网部署简单,无端口暴露风险
为什么自研 CSS,不用 Bootstrap?
项目内网使用,要求无外网依赖,所有资源都通过 CDN 的 ECharts 是唯一例外(也可以改成本地引入)。纯原生 CSS 变量实现主题体系,改一个颜色变量全局生效。
APScheduler 的坑
Flask 开发模式下默认开启 use_reloader=True,会启动两个进程,导致定时任务跑两遍。建议生产部署时用:
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=False, use_reloader=False)
八、后续计划
- [ ] 支持邮件/企微告警(记录数异常时推送通知)
- [ ] 新增数据量趋势预警(基于历史均值自动判断异常)
- [ ] 支持 PostgreSQL / Oracle 等其他数据库类型
- [ ] 增加数据导出功能(Excel / CSV)
- [ ] 支持多用户权限管理
九、写在最后
这个工具从最初的"临时脚本"一路迭代成了现在的完整系统,主要解决的痛点就是:在多个 MySQL 实例之间,实时知道业务数据有没有在正常流入。
适合场景:数据同步任务监控 / ETL 管道状态观察 / 接口数据写入验证 / 日常数据质量检查。
源码已整理,如有需要可以回复帖子获取。
有问题欢迎在评论区留言,看到必回 🙌
[Python] 纯文本查看 复制代码 """
MySQL数据库监控系统
Flask + SQLite + APScheduler
"""
from flask import Flask, render_template, request, jsonify, session, redirect, url_for, flash
from flask_apscheduler import APScheduler
from datetime import datetime, timedelta
import sqlite3
import json
import hashlib
import pymysql
import threading
import time
from functools import wraps
def now_local():
"""返回本地时间字符串(北京时间),格式 YYYY-MM-DD HH:MM:SS"""
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
def today_local():
"""返回本地今日日期字符串(北京时间),格式 YYYY-MM-DD"""
return datetime.now().strftime('%Y-%m-%d')
app = Flask(__name__)
app.secret_key = 'db_monitor_secret_key_2024'
# 定时任务配置
class Config:
SCHEDULER_API_ENABLED = True
SCHEDULER_TIMEZONE = 'Asia/Shanghai'
app.config.from_object(Config)
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
DATABASE = 'monitor.db'
def get_db():
"""获取数据库连接"""
conn = sqlite3.connect(DATABASE)
conn.row_factory = sqlite3.Row
return conn
import contextlib
@contextlib.contextmanager
def get_db_ctx():
"""数据库连接上下文管理器,自动关闭连接"""
conn = get_db()
try:
yield conn
finally:
conn.close()
def init_db():
"""初始化数据库表"""
conn = get_db()
cursor = conn.cursor()
# 用户表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 数据库连接配置表
cursor.execute('''
CREATE TABLE IF NOT EXISTS db_connections (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
alias TEXT,
db_type TEXT DEFAULT 'mysql',
host TEXT NOT NULL,
port INTEGER DEFAULT 3306,
username TEXT NOT NULL,
password TEXT NOT NULL,
database_name TEXT,
status INTEGER DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 监控任务表
cursor.execute('''
CREATE TABLE IF NOT EXISTS monitor_tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
db_connection_id INTEGER NOT NULL,
database_name TEXT NOT NULL,
table_name TEXT NOT NULL,
table_alias TEXT,
check_condition TEXT,
sql_query TEXT,
frequency TEXT DEFAULT '5min',
status INTEGER DEFAULT 1,
last_run_at TIMESTAMP,
last_result INTEGER DEFAULT 0,
last_error TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (db_connection_id) REFERENCES db_connections(id)
)
''')
# 监控结果表
cursor.execute('''
CREATE TABLE IF NOT EXISTS monitor_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id INTEGER NOT NULL,
record_count INTEGER DEFAULT 0,
status TEXT DEFAULT 'success',
error_message TEXT,
executed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (task_id) REFERENCES monitor_tasks(id)
)
''')
# 兼容旧数据库:添加必要字段
cursor.execute("PRAGMA table_info(monitor_tasks)")
columns = [col[1] for col in cursor.fetchall()]
if 'sql_query' not in columns:
cursor.execute('ALTER TABLE monitor_tasks ADD COLUMN sql_query TEXT')
cursor.execute("PRAGMA table_info(monitor_tasks)")
col_info = {col[1]: col for col in cursor.fetchall()}
if 'last_result' not in col_info:
cursor.execute('ALTER TABLE monitor_tasks ADD COLUMN last_result INTEGER DEFAULT 0')
if 'is_published' not in col_info:
cursor.execute('ALTER TABLE monitor_tasks ADD COLUMN is_published INTEGER DEFAULT 0')
if 'table_alias' not in col_info:
cursor.execute('ALTER TABLE monitor_tasks ADD COLUMN table_alias TEXT')
if 'sort_order' not in col_info:
cursor.execute('ALTER TABLE monitor_tasks ADD COLUMN sort_order INTEGER DEFAULT 0')
# 兼容旧 db_connections:添加 alias 字段
cursor.execute("PRAGMA table_info(db_connections)")
conn_col_info = {col[1]: col for col in cursor.fetchall()}
if 'alias' not in conn_col_info:
cursor.execute('ALTER TABLE db_connections ADD COLUMN alias TEXT')
# 数据库别名表(实例下的数据库级别别名)
cursor.execute('''
CREATE TABLE IF NOT EXISTS db_aliases (
id INTEGER PRIMARY KEY AUTOINCREMENT,
connection_id INTEGER NOT NULL,
db_name TEXT NOT NULL,
alias TEXT DEFAULT '',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (connection_id) REFERENCES db_connections(id),
UNIQUE(connection_id, db_name)
)
''')
# 创建默认管理员账号
default_pass = hashlib.sha256('admin123'.encode()).hexdigest()
cursor.execute('''
INSERT OR IGNORE INTO users (username, password)
VALUES (?, ?)
''', ('admin', default_pass))
conn.commit()
conn.close()
def login_required(f):
"""登录验证装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
if request.is_json:
return jsonify({'success': False, 'message': '请先登录'}), 401
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
# ==================== 页面路由 ====================
@app.route('/')
def index():
"""主界面 - 监控结果展示"""
return render_template('index.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
"""登录页面"""
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
conn = get_db()
cursor = conn.cursor()
hashed_pass = hashlib.sha256(password.encode()).hexdigest()
user = cursor.execute(
'SELECT * FROM users WHERE username = ? AND password = ?',
(username, hashed_pass)
).fetchone()
conn.close()
if user:
session['user_id'] = user['id']
session['username'] = user['username']
return redirect(url_for('admin'))
else:
flash('用户名或密码错误', 'error')
return render_template('login.html')
@app.route('/logout')
def logout():
"""退出登录"""
session.clear()
return redirect(url_for('login'))
@app.route('/admin')
@login_required
def admin():
"""后台管理界面"""
return render_template('admin.html')
# ==================== API接口 ====================
@app.route('/api/monitor/results')
def get_monitor_results():
"""获取监控结果列表"""
with get_db_ctx() as conn:
cursor = conn.cursor()
# 预加载数据库别名
cursor.execute('SELECT connection_id, db_name, alias FROM db_aliases')
db_alias_map = {}
for r in cursor.fetchall():
db_alias_map[(r['connection_id'], r['db_name'])] = r['alias'] or ''
# 获取今天的结果(只显示已发布且未删除的任务)
cursor.execute('''
SELECT
mr.id,
mt.id as task_id,
mt.db_connection_id,
dc.name as instance_name,
CASE WHEN dc.alias IS NOT NULL AND dc.alias != '' THEN dc.alias ELSE dc.name END as instance_display,
mt.database_name,
mt.table_name,
CASE WHEN mt.table_alias IS NOT NULL AND mt.table_alias != '' THEN mt.table_alias ELSE mt.table_name END as table_display,
mt.table_alias,
dc.alias as instance_alias,
mr.record_count,
mr.status,
mr.error_message,
mr.executed_at,
mt.sql_query
FROM monitor_results mr
JOIN monitor_tasks mt ON mr.task_id = mt.id
JOIN db_connections dc ON mt.db_connection_id = dc.id
WHERE substr(mr.executed_at, 1, 10) = ?
AND mt.status = 1
AND mt.is_published = 1
ORDER BY mr.executed_at DESC
LIMIT 200
''', (today_local(),))
results = []
for row in cursor.fetchall():
executed_at = row['executed_at']
# 查找数据库别名
db_alias = db_alias_map.get((row['db_connection_id'], row['database_name']), '')
results.append({
'id': row['id'],
'task_id': row['task_id'],
'instance_name': row['instance_name'],
'instance_display': row['instance_display'],
'instance_alias': row['instance_alias'] or '',
'database_name': row['database_name'],
'database_display': db_alias if db_alias else row['database_name'],
'database_alias': db_alias,
'table_name': row['table_name'],
'table_display': row['table_display'],
'table_alias': row['table_alias'] or '',
'record_count': row['record_count'] if row['record_count'] is not None else 0,
'status': row['status'],
'error_message': row['error_message'],
'executed_at': executed_at,
'sql_query': row['sql_query']
})
return jsonify({'success': True, 'data': results})
@app.route('/api/monitor/instances')
def get_monitor_instances():
"""获取监控实例列表(用于左侧导航树)"""
with get_db_ctx() as conn:
cursor = conn.cursor()
# 后台管理:显示所有任务(包括未发布的)
include_unpublished = request.args.get('include_unpublished', '0') == '1'
# 预加载数据库别名
cursor.execute('SELECT connection_id, db_name, alias FROM db_aliases')
db_alias_map = {}
for r in cursor.fetchall():
db_alias_map[(r['connection_id'], r['db_name'])] = r['alias'] or ''
if include_unpublished:
# 后台管理 - 显示所有任务
cursor.execute('''
SELECT DISTINCT
dc.id as connection_id,
dc.name as instance_name,
CASE WHEN dc.alias IS NOT NULL AND dc.alias != '' THEN dc.alias ELSE dc.name END as instance_display,
dc.alias as instance_alias,
mt.database_name,
mt.table_name,
CASE WHEN mt.table_alias IS NOT NULL AND mt.table_alias != '' THEN mt.table_alias ELSE mt.table_name END as table_display,
mt.table_alias,
mt.id as task_id,
mt.status as task_status,
mt.is_published,
mt.last_result,
mt.last_run_at,
mt.sql_query
FROM db_connections dc
LEFT JOIN monitor_tasks mt ON dc.id = mt.db_connection_id AND mt.status = 1
WHERE dc.status = 1
ORDER BY dc.name, mt.database_name, mt.table_name
''')
else:
# 监控面板 - 只显示已发布任务
cursor.execute('''
SELECT DISTINCT
dc.id as connection_id,
dc.name as instance_name,
CASE WHEN dc.alias IS NOT NULL AND dc.alias != '' THEN dc.alias ELSE dc.name END as instance_display,
dc.alias as instance_alias,
mt.database_name,
mt.table_name,
CASE WHEN mt.table_alias IS NOT NULL AND mt.table_alias != '' THEN mt.table_alias ELSE mt.table_name END as table_display,
mt.table_alias,
mt.id as task_id,
mt.status as task_status,
mt.is_published,
mt.last_result,
mt.last_run_at,
mt.sql_query
FROM db_connections dc
LEFT JOIN monitor_tasks mt ON dc.id = mt.db_connection_id AND mt.status = 1 AND mt.is_published = 1
WHERE dc.status = 1
ORDER BY dc.name, mt.database_name, mt.table_name
''')
rows = cursor.fetchall()
instances = {}
for row in rows:
inst_name = row['instance_name']
db_name = row['database_name']
if inst_name not in instances:
instances[inst_name] = {
'id': row['connection_id'],
'name': inst_name,
'display': row['instance_display'],
'alias': row['instance_alias'] or '',
'databases': {}
}
if db_name and db_name not in instances[inst_name]['databases']:
db_alias = db_alias_map.get((row['connection_id'], db_name), '')
instances[inst_name]['databases'][db_name] = {
'name': db_name,
'alias': db_alias,
'display': db_alias if db_alias else db_name,
'tables': []
}
if db_name and row['table_name']:
instances[inst_name]['databases'][db_name]['tables'].append({
'task_id': row['task_id'],
'name': row['table_name'],
'display': row['table_display'],
'alias': row['table_alias'] or '',
'status': row['task_status'],
'is_published': row['is_published'] if row['is_published'] is not None else 0,
'last_result': row['last_result'] if row['last_result'] is not None else 0,
'last_run_at': row['last_run_at'],
'sql_query': row['sql_query']
})
return jsonify({'success': True, 'data': list(instances.values())})
@app.route('/api/monitor/summary')
def get_monitor_summary():
"""获取监控汇总统计"""
with get_db_ctx() as conn:
cursor = conn.cursor()
# 统计有已发布监控任务的数据库连接数(实例数)
cursor.execute('''
SELECT COUNT(DISTINCT db_connection_id) as instance_count
FROM monitor_tasks WHERE status = 1 AND is_published = 1
''')
instance_stats = cursor.fetchone()
cursor.execute('''
SELECT
COUNT(*) as total_tasks,
SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) as active_tasks,
SUM(CASE WHEN last_result > 0 THEN 1 ELSE 0 END) as has_data_tasks
FROM monitor_tasks
WHERE status = 1
''')
task_stats = cursor.fetchone()
cursor.execute('''
SELECT
COUNT(*) as today_records,
SUM(mr.record_count) as total_records
FROM monitor_results mr
JOIN monitor_tasks mt ON mr.task_id = mt.id
WHERE substr(mr.executed_at, 1, 10) = ?
AND mt.status = 1
AND mt.is_published = 1
''', (today_local(),))
today_stats = cursor.fetchone()
# 最近一次成功同步时间
cursor.execute('''
SELECT mr.executed_at
FROM monitor_results mr
JOIN monitor_tasks mt ON mr.task_id = mt.id
WHERE mr.status = 'success'
AND mt.status = 1
AND mt.is_published = 1
ORDER BY mr.executed_at DESC
LIMIT 1
''')
last_sync_row = cursor.fetchone()
last_sync_at = last_sync_row['executed_at'] if last_sync_row else None
cursor.execute('''
SELECT COUNT(*) as error_count
FROM monitor_results mr
JOIN monitor_tasks mt ON mr.task_id = mt.id
WHERE mr.status = 'error' AND substr(mr.executed_at, 1, 10) = ?
AND mt.status = 1
AND mt.is_published = 1
''', (today_local(),))
error_stats = cursor.fetchone()
return jsonify({
'success': True,
'data': {
'total_instances': instance_stats['instance_count'] or 0,
'total_tasks': task_stats['total_tasks'] or 0,
'active_tasks': task_stats['active_tasks'] or 0,
'has_data_tasks': task_stats['has_data_tasks'] or 0,
'today_records': today_stats['today_records'] or 0,
'today_total_records': today_stats['total_records'] or 0,
'today_errors': error_stats['error_count'] or 0,
'last_sync_at': last_sync_at
}
})
@app.route('/api/monitor/chart-data')
def get_chart_data():
"""获取图表数据:近N天趋势 + 今日柱状图"""
# 支持 days=7 或 days=30
try:
n_days = int(request.args.get('days', 7))
except (TypeError, ValueError):
n_days = 7
n_days = max(7, min(30, n_days))
# 支持按 task_id 过滤趋势图
task_id_filter = request.args.get('task_id')
days = [(datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d') for i in range(n_days - 1, -1, -1)]
with get_db_ctx() as conn:
cursor = conn.cursor()
# 预加载数据库别名(用于图表 label)
cursor.execute('SELECT connection_id, db_name, alias FROM db_aliases')
db_alias_map_chart = {}
for r in cursor.fetchall():
db_alias_map_chart[(r['connection_id'], r['db_name'])] = r['alias'] or ''
# 构建趋势查询(支持 task_id 过滤)
trend_where = "WHERE substr(mr.executed_at, 1, 10) >= ? AND mt.status = 1 AND mt.is_published = 1"
trend_params = [days[0]]
if task_id_filter:
trend_where += " AND mt.id = ?"
trend_params.append(int(task_id_filter))
cursor.execute(f"""
SELECT substr(mr.executed_at, 1, 10) as date,
mt.id as task_id,
mt.database_name,
mt.table_name,
CASE WHEN mt.table_alias IS NOT NULL AND mt.table_alias != '' THEN mt.table_alias ELSE mt.table_name END as table_display,
dc.id as connection_id,
dc.name as instance_name,
CASE WHEN dc.alias IS NOT NULL AND dc.alias != '' THEN dc.alias ELSE dc.name END as instance_display,
COALESCE(SUM(mr.record_count), 0) as total_count
FROM monitor_results mr
JOIN monitor_tasks mt ON mr.task_id = mt.id
JOIN db_connections dc ON mt.db_connection_id = dc.id
{trend_where}
GROUP BY substr(mr.executed_at, 1, 10), mt.id
ORDER BY date ASC
""", trend_params)
rows = cursor.fetchall()
table_map = {}
for row in rows:
key = str(row['task_id'])
db_alias = db_alias_map_chart.get((row['connection_id'], row['database_name']), '')
db_display = db_alias if db_alias else row['database_name']
label = db_display + ' · ' + row['table_display']
if key not in table_map:
table_map[key] = {'label': label, 'data': {d: 0 for d in days}}
if row['date'] in table_map[key]['data']:
table_map[key]['data'][row['date']] = int(row['total_count'] or 0)
# 今日柱状图:以全部已发布任务为基础,LEFT JOIN 今日最新结果
# 确保今日还没跑的任务也显示(count=0),不遗漏任何任务
cursor.execute("""
SELECT mt.id as task_id, mt.table_name, mt.sort_order,
mt.database_name,
CASE WHEN mt.table_alias IS NOT NULL AND mt.table_alias != '' THEN mt.table_alias ELSE mt.table_name END as table_display,
dc.id as connection_id,
dc.name as instance_name,
CASE WHEN dc.alias IS NOT NULL AND dc.alias != '' THEN dc.alias ELSE dc.name END as instance_display,
latest.record_count, latest.status, latest.executed_at
FROM monitor_tasks mt
JOIN db_connections dc ON mt.db_connection_id = dc.id
LEFT JOIN (
SELECT mr.task_id,
mr.record_count,
mr.status,
mr.executed_at
FROM monitor_results mr
INNER JOIN (
SELECT task_id, MAX(executed_at) as max_at
FROM monitor_results
WHERE substr(executed_at, 1, 10) = ?
GROUP BY task_id
) sub ON mr.task_id = sub.task_id AND mr.executed_at = sub.max_at
) latest ON latest.task_id = mt.id
WHERE mt.status = 1
AND mt.is_published = 1
ORDER BY mt.sort_order ASC, mt.id ASC
""", (today_local(),))
today_rows = cursor.fetchall()
seen = set()
today_bars = []
all_tasks = [] # 用于前端构建下拉列表
for row in today_rows:
key = str(row['task_id'])
if key not in seen:
seen.add(key)
db_alias_bar = db_alias_map_chart.get((row['connection_id'], row['database_name']), '')
db_display_bar = db_alias_bar if db_alias_bar else row['database_name']
today_bars.append({
'task_id': row['task_id'],
'name': row['table_display'],
'raw_name': row['table_name'],
'db_display': db_display_bar,
'instance': row['instance_display'],
'sort_order': row['sort_order'] if row['sort_order'] is not None else 0,
'count': int(row['record_count'] or 0),
# LEFT JOIN 时 status/time 可能为 None(任务今日未运行)
'status': row['status'] if row['status'] else 'pending',
'time': row['executed_at'] if row['executed_at'] else None
})
# 获取所有已发布任务(用于下拉选择)
cursor.execute("""
SELECT mt.id, mt.database_name, mt.table_name,
CASE WHEN mt.table_alias IS NOT NULL AND mt.table_alias != '' THEN mt.table_alias ELSE mt.table_name END as table_display,
dc.id as connection_id,
CASE WHEN dc.alias IS NOT NULL AND dc.alias != '' THEN dc.alias ELSE dc.name END as instance_display
FROM monitor_tasks mt
JOIN db_connections dc ON mt.db_connection_id = dc.id
WHERE mt.status = 1 AND mt.is_published = 1
ORDER BY dc.name, mt.database_name, mt.table_name
""")
for row in cursor.fetchall():
db_alias = db_alias_map_chart.get((row['connection_id'], row['database_name']), '')
db_display = db_alias if db_alias else row['database_name']
all_tasks.append({
'task_id': row['id'],
'label': db_display + ' · ' + row['table_display']
})
series = [{'name': v['label'], 'task_id': k, 'data': [v['data'][d] for d in days]} for k, v in table_map.items()]
return jsonify({'success': True, 'data': {
'days': [d[5:] for d in days],
'series': series,
'today_bars': today_bars,
'all_tasks': all_tasks
}})
# 数据库连接管理API
@app.route('/api/db/connections', methods=['GET', 'POST'])
@login_required
def db_connections():
"""数据库连接管理"""
if request.method == 'GET':
with get_db_ctx() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM db_connections WHERE status = 1 ORDER BY id DESC')
connections = []
for row in cursor.fetchall():
connections.append({
'id': row['id'],
'name': row['name'],
'alias': row['alias'] or '',
'display': row['alias'] if row['alias'] else row['name'],
'db_type': row['db_type'],
'host': row['host'],
'port': row['port'],
'username': row['username'],
'database_name': row['database_name'],
'status': row['status']
})
return jsonify({'success': True, 'data': connections})
elif request.method == 'POST':
data = request.json
with get_db_ctx() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO db_connections (name, alias, db_type, host, port, username, password, database_name)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
data.get('name'),
data.get('alias', ''),
data.get('db_type', 'mysql'),
data.get('host'),
data.get('port', 3306),
data.get('username'),
data.get('password'),
data.get('database_name')
))
conn.commit()
return jsonify({'success': True, 'message': '数据库连接添加成功'})
@app.route('/api/db/connections/<int:conn_id>', methods=['PUT', 'DELETE'])
@login_required
def db_connection_detail(conn_id):
"""单个数据库连接管理"""
if request.method == 'PUT':
data = request.json
# 快捷别名更新:只传 alias 字段时,只更新 alias
if 'alias' in data and 'host' not in data:
with get_db_ctx() as conn:
cursor = conn.cursor()
cursor.execute('UPDATE db_connections SET alias = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?',
(data.get('alias', ''), conn_id))
conn.commit()
return jsonify({'success': True, 'message': '别名更新成功'})
with get_db_ctx() as conn:
cursor = conn.cursor()
if data.get('password'):
cursor.execute('''
UPDATE db_connections
SET name = ?, alias = ?, host = ?, port = ?, username = ?, password = ?, database_name = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (
data.get('name'),
data.get('alias', ''),
data.get('host'),
data.get('port'),
data.get('username'),
data.get('password'),
data.get('database_name'),
conn_id
))
else:
cursor.execute('''
UPDATE db_connections
SET name = ?, alias = ?, host = ?, port = ?, username = ?, database_name = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (
data.get('name'),
data.get('alias', ''),
data.get('host'),
data.get('port'),
data.get('username'),
data.get('database_name'),
conn_id
))
conn.commit()
return jsonify({'success': True, 'message': '数据库连接更新成功'})
elif request.method == 'DELETE':
with get_db_ctx() as conn:
cursor = conn.cursor()
cursor.execute('UPDATE db_connections SET status = 0 WHERE id = ?', (conn_id,))
conn.commit()
return jsonify({'success': True, 'message': '数据库连接已删除'})
@app.route('/api/db/database-alias', methods=['GET', 'PUT'])
@login_required
def database_alias():
"""数据库别名管理(实例下的数据库级别别名)"""
if request.method == 'GET':
conn_id = request.args.get('connection_id', type=int)
db_name = request.args.get('db_name', '')
with get_db_ctx() as conn:
cursor = conn.cursor()
if conn_id and db_name:
cursor.execute('SELECT alias FROM db_aliases WHERE connection_id = ? AND db_name = ?', (conn_id, db_name))
row = cursor.fetchone()
return jsonify({'success': True, 'alias': row['alias'] if row else ''})
else:
cursor.execute('SELECT connection_id, db_name, alias FROM db_aliases')
aliases = [{'connection_id': r['connection_id'], 'db_name': r['db_name'], 'alias': r['alias'] or ''} for r in cursor.fetchall()]
return jsonify({'success': True, 'data': aliases})
elif request.method == 'PUT':
data = request.json
conn_id = data.get('connection_id')
db_name = data.get('db_name')
alias = data.get('alias', '')
if not conn_id or not db_name:
return jsonify({'success': False, 'message': '缺少 connection_id 或 db_name'})
with get_db_ctx() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO db_aliases (connection_id, db_name, alias, updated_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(connection_id, db_name) DO UPDATE SET alias = ?, updated_at = CURRENT_TIMESTAMP
''', (conn_id, db_name, alias, alias))
conn.commit()
return jsonify({'success': True, 'message': '数据库别名更新成功'})
@app.route('/api/db/test', methods=['POST'])
@login_required
def test_db_connection():
"""测试数据库连接"""
data = request.json
try:
if data.get('db_type') in ('mysql', None, ''):
connection = pymysql.connect(
host=data.get('host'),
port=int(data.get('port', 3306)),
user=data.get('username'),
password=data.get('password'),
database=data.get('database_name') or None,
connect_timeout=5
)
connection.close()
return jsonify({'success': True, 'message': '连接成功'})
else:
return jsonify({'success': False, 'message': '暂不支持该数据库类型'})
except Exception as e:
return jsonify({'success': False, 'message': f'连接失败: {str(e)}'})
@app.route('/api/db/execute_sql', methods=['POST'])
@login_required
def execute_sql():
"""执行SQL查询(仅SELECT),返回 Dict 格式数据"""
data = request.json
conn_id = data.get('connection_id')
sql = data.get('sql', '').strip()
database_name = data.get('database_name')
if not sql:
return jsonify({'success': False, 'message': 'SQL语句不能为空'})
if not sql.upper().startswith('SELECT'):
return jsonify({'success': False, 'message': '仅支持 SELECT 查询语句'})
with get_db_ctx() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM db_connections WHERE id = ? AND status = 1', (conn_id,))
db_conn = cursor.fetchone()
if not db_conn:
return jsonify({'success': False, 'message': '数据库连接不存在或已停用'})
try:
connection = pymysql.connect(
host=db_conn['host'],
port=db_conn['port'],
user=db_conn['username'],
password=db_conn['password'],
database=database_name or None,
connect_timeout=5,
charset='utf8mb4'
)
# 使用 DictCursor,返回 dict 格式的 rows
from pymysql.cursors import DictCursor
with connection.cursor(DictCursor) as cursor:
cursor.execute(sql)
columns = list(cursor.description) if cursor.description else []
col_names = [c[0] for c in columns]
rows = cursor.fetchall() # 每行都是 dict
# 限制返回条数
rows = rows[:100]
row_count = len(rows)
has_more = len(rows) == 100
connection.close()
return jsonify({
'success': True,
'message': f'查询成功,返回 {row_count} 条记录' + ('(已截断,仅显示前100条)' if has_more else ''),
'data': {'columns': col_names, 'rows': rows, 'row_count': row_count}
})
except Exception as e:
return jsonify({'success': False, 'message': f'SQL执行失败: {str(e)}'})
@app.route('/api/db/databases', methods=['POST'])
@login_required
def get_databases():
"""获取数据库列表"""
data = request.json
conn_id = data.get('connection_id')
with get_db_ctx() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM db_connections WHERE id = ?', (conn_id,))
db_conn = cursor.fetchone()
if not db_conn:
return jsonify({'success': False, 'message': '数据库连接不存在'})
try:
connection = pymysql.connect(
host=db_conn['host'],
port=db_conn['port'],
user=db_conn['username'],
password=db_conn['password'],
connect_timeout=5
)
with connection.cursor() as cursor:
cursor.execute('SHOW DATABASES')
databases = [row[0] for row in cursor.fetchall()]
databases = [db for db in databases if db not in ['information_schema', 'mysql', 'performance_schema', 'sys']]
connection.close()
return jsonify({'success': True, 'data': databases})
except Exception as e:
return jsonify({'success': False, 'message': f'获取数据库列表失败: {str(e)}'})
@app.route('/api/db/tables', methods=['POST'])
@login_required
def get_tables():
"""获取表列表"""
data = request.json
conn_id = data.get('connection_id')
database_name = data.get('database_name')
with get_db_ctx() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM db_connections WHERE id = ?', (conn_id,))
db_conn = cursor.fetchone()
if not db_conn:
return jsonify({'success': False, 'message': '数据库连接不存在'})
try:
connection = pymysql.connect(
host=db_conn['host'],
port=db_conn['port'],
user=db_conn['username'],
password=db_conn['password'],
database=database_name,
connect_timeout=5
)
with connection.cursor() as cursor:
cursor.execute('SHOW TABLES')
tables = [row[0] for row in cursor.fetchall()]
connection.close()
return jsonify({'success': True, 'data': tables})
except Exception as e:
return jsonify({'success': False, 'message': f'获取表列表失败: {str(e)}'})
# 监控任务管理API
@app.route('/api/monitor/tasks', methods=['GET', 'POST'])
@login_required
def monitor_tasks():
"""监控任务管理"""
if request.method == 'GET':
with get_db_ctx() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT mt.*, dc.name as db_connection_name
FROM monitor_tasks mt
JOIN db_connections dc ON mt.db_connection_id = dc.id
WHERE mt.status = 1
ORDER BY dc.name, mt.database_name, mt.table_name
''')
tasks = []
for row in cursor.fetchall():
tasks.append({
'id': row['id'],
'name': row['name'],
'db_connection_id': row['db_connection_id'],
'db_connection_name': row['db_connection_name'],
'database_name': row['database_name'],
'table_name': row['table_name'],
'table_alias': row['table_alias'] if row['table_alias'] is not None else '',
'check_condition': row['check_condition'],
'sql_query': row['sql_query'],
'frequency': row['frequency'],
'status': row['status'],
'is_published': row['is_published'] if row['is_published'] is not None else 0,
'last_run_at': row['last_run_at'],
'last_result': row['last_result'] if row['last_result'] is not None else 0,
'last_error': row['last_error'],
'sort_order': row['sort_order'] if row['sort_order'] is not None else 0
})
return jsonify({'success': True, 'data': tasks})
elif request.method == 'POST':
data = request.json
tasks_data = data.get('tasks', [])
if not tasks_data:
return jsonify({'success': False, 'message': '没有要创建的任务'})
with get_db_ctx() as conn:
cursor = conn.cursor()
# 允许同一张表创建多个不同监控任务,直接插入所有任务
new_tasks = tasks_data
# 插入新任务
created_ids = []
for task in new_tasks:
cursor.execute('''
INSERT INTO monitor_tasks (name, db_connection_id, database_name, table_name, check_condition, sql_query, frequency)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
task.get('name'),
task.get('db_connection_id'),
task.get('database_name'),
task.get('table_name'),
task.get('check_condition'),
task.get('sql_query'),
task.get('frequency', '5min')
))
created_ids.append(cursor.lastrowid)
conn.commit()
load_scheduler_jobs()
# 立即执行新建的任务,让监控面板立刻能看到数据
for task_id in created_ids:
try:
execute_monitor_task(task_id)
except Exception:
pass # 忽略执行错误,不影响创建成功的结果
msg = f'成功创建 {len(new_tasks)} 个监控任务'
return jsonify({'success': True, 'message': msg})
@app.route('/api/monitor/tasks/<int:task_id>', methods=['GET', 'PUT', 'DELETE'])
@login_required
def monitor_task_detail(task_id):
"""单个监控任务管理"""
conn = get_db()
cursor = conn.cursor()
if request.method == 'GET':
cursor.execute('''
SELECT mt.*, dc.name as db_connection_name
FROM monitor_tasks mt
JOIN db_connections dc ON mt.db_connection_id = dc.id
WHERE mt.id = ?
''', (task_id,))
row = cursor.fetchone()
conn.close()
if not row:
return jsonify({'success': False, 'message': '任务不存在'})
return jsonify({
'success': True,
'data': {
'id': row['id'],
'name': row['name'],
'db_connection_id': row['db_connection_id'],
'db_connection_name': row['db_connection_name'],
'database_name': row['database_name'],
'table_name': row['table_name'],
'check_condition': row['check_condition'],
'sql_query': row['sql_query'],
'frequency': row['frequency'],
'status': row['status'],
'is_published': row['is_published'] if row['is_published'] is not None else 0,
'last_run_at': row['last_run_at'],
'last_result': row['last_result'] if row['last_result'] is not None else 0,
'last_error': row['last_error']
}
})
elif request.method == 'PUT':
data = request.json
cursor.execute('''
UPDATE monitor_tasks
SET check_condition = ?, sql_query = ?, frequency = ?, table_alias = ?
WHERE id = ?
''', (
data.get('check_condition'),
data.get('sql_query'),
data.get('frequency'),
data.get('table_alias', ''),
task_id
))
conn.commit()
conn.close()
load_scheduler_jobs()
return jsonify({'success': True, 'message': '监控任务更新成功'})
elif request.method == 'DELETE':
cursor.execute('UPDATE monitor_tasks SET status = 0 WHERE id = ?', (task_id,))
conn.commit()
conn.close()
load_scheduler_jobs()
return jsonify({'success': True, 'message': '监控任务已删除'})
@app.route('/api/monitor/tasks/<int:task_id>/publish', methods=['POST'])
@login_required
def publish_task(task_id):
"""发布任务到监控面板"""
with get_db_ctx() as conn:
cursor = conn.cursor()
cursor.execute('UPDATE monitor_tasks SET is_published = 1 WHERE id = ?', (task_id,))
conn.commit()
return jsonify({'success': True, 'message': '任务已发布到监控面板'})
@app.route('/api/monitor/tasks/<int:task_id>/unpublish', methods=['POST'])
@login_required
def unpublish_task(task_id):
"""撤销任务(从监控面板移除)"""
with get_db_ctx() as conn:
cursor = conn.cursor()
cursor.execute('UPDATE monitor_tasks SET is_published = 0 WHERE id = ?', (task_id,))
conn.commit()
return jsonify({'success': True, 'message': '任务已从监控面板撤销'})
@app.route('/api/monitor/tasks/batch-delete', methods=['POST'])
@login_required
def batch_delete_tasks():
"""批量删除监控任务"""
data = request.json
task_ids = data.get('task_ids', [])
if not task_ids:
return jsonify({'success': False, 'message': '未选择任务'})
with get_db_ctx() as conn:
cursor = conn.cursor()
placeholders = ','.join(['?' for _ in task_ids])
cursor.execute(f'UPDATE monitor_tasks SET status = 0 WHERE id IN ({placeholders})', task_ids)
conn.commit()
load_scheduler_jobs()
return jsonify({'success': True, 'message': f'成功删除 {len(task_ids)} 个监控任务'})
@app.route('/api/monitor/tasks/batch-sort', methods=['POST'])
@login_required
def batch_sort_tasks():
"""批量更新任务排序权重"""
data = request.json
# [{task_id: 1, sort_order: 0}, {task_id: 2, sort_order: 1}, ...]
orders = data.get('orders', [])
if not orders:
return jsonify({'success': False, 'message': '无排序数据'})
with get_db_ctx() as conn:
cursor = conn.cursor()
for item in orders:
cursor.execute('UPDATE monitor_tasks SET sort_order = ? WHERE id = ?',
(item.get('sort_order', 0), item.get('task_id')))
conn.commit()
return jsonify({'success': True, 'message': '排序已保存'})
# ==================== 定时任务执行 ====================
def execute_monitor_task(task_id):
"""执行单个监控任务"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
SELECT mt.*, dc.host, dc.port, dc.username, dc.password
FROM monitor_tasks mt
JOIN db_connections dc ON mt.db_connection_id = dc.id
WHERE mt.id = ? AND mt.status = 1
''', (task_id,))
task = cursor.fetchone()
if not task:
conn.close()
return
try:
# 连接数据库
connection = pymysql.connect(
host=task['host'],
port=task['port'],
user=task['username'],
password=task['password'],
database=task['database_name'],
connect_timeout=10,
charset='utf8mb4'
)
# 构建SQL:如果有自定义SQL直接用;否则用默认模板
if task['sql_query'] and task['sql_query'].strip():
sql = task['sql_query']
else:
# 默认SQL:统计今日数据
check_condition = task['check_condition'] or ''
if check_condition.strip():
if 'where' in check_condition.lower():
sql = f"SELECT COUNT(*) FROM {task['table_name']} {check_condition}"
else:
sql = f"SELECT COUNT(*) FROM {task['table_name']} WHERE {check_condition}"
else:
today = datetime.now().strftime('%Y-%m-%d')
sql = f"SELECT COUNT(*) FROM {task['table_name']} WHERE DATE(created_at) = '{today}'"
# 执行SQL,使用普通游标
with connection.cursor() as cur:
cur.execute(sql)
result = cur.fetchone()
# fetchone() 返回 (count,) 元组,result[0] 即为数量
if result and len(result) > 0:
count = int(result[0]) if result[0] is not None else 0
else:
count = 0
connection.close()
local_now = now_local()
local_today = today_local()
# 更新任务状态
cursor.execute('''
UPDATE monitor_tasks SET last_run_at = ?, last_result = ?, last_error = NULL
WHERE id = ?
''', (local_now, count, task_id))
# 更新今天的监控结果(如果已有今天的结果),否则插入新记录
# 先检查今天是否已有该任务的执行记录(用本地日期比较)
cursor.execute('''
SELECT id FROM monitor_results
WHERE task_id = ? AND substr(executed_at, 1, 10) = ?
''', (task_id, local_today))
existing = cursor.fetchone()
if existing:
# 更新今天的记录
cursor.execute('''
UPDATE monitor_results
SET record_count = ?, status = 'success', error_message = NULL, executed_at = ?
WHERE id = ?
''', (count, local_now, existing['id']))
else:
# 插入新记录
cursor.execute('''
INSERT INTO monitor_results (task_id, record_count, status, executed_at)
VALUES (?, ?, ?, ?)
''', (task_id, count, 'success', local_now))
conn.commit()
except Exception as e:
error_msg = str(e)
local_now = now_local()
local_today = today_local()
# 更新任务状态为错误
cursor.execute('''
UPDATE monitor_tasks SET last_run_at = ?, last_error = ?
WHERE id = ?
''', (local_now, error_msg, task_id))
# 更新今天的监控结果(如果已有今天的结果),否则插入新记录
cursor.execute('''
SELECT id FROM monitor_results
WHERE task_id = ? AND substr(executed_at, 1, 10) = ?
''', (task_id, local_today))
existing = cursor.fetchone()
if existing:
cursor.execute('''
UPDATE monitor_results
SET record_count = 0, status = 'error', error_message = ?, executed_at = ?
WHERE id = ?
''', (error_msg, local_now, existing['id']))
else:
cursor.execute('''
INSERT INTO monitor_results (task_id, record_count, status, error_message, executed_at)
VALUES (?, ?, ?, ?, ?)
''', (task_id, 0, 'error', error_msg, local_now))
conn.commit()
finally:
conn.close()
def load_scheduler_jobs():
"""加载所有监控任务到调度器(只调度已发布的任务)"""
for job in scheduler.get_jobs():
if job.id.startswith('monitor_'):
scheduler.remove_job(job.id)
with get_db_ctx() as conn:
cursor = conn.cursor()
cursor.execute('SELECT id, frequency FROM monitor_tasks WHERE status = 1 AND is_published = 1')
tasks = cursor.fetchall()
freq_map = {
'1min': 60,
'5min': 300,
'10min': 600,
'30min': 1800,
'1hour': 3600,
'4hour': 14400
}
for task in tasks:
seconds = freq_map.get(task['frequency'], 300)
scheduler.add_job(
id=f"monitor_{task['id']}",
func=execute_monitor_task,
args=[task['id']],
trigger='interval',
seconds=seconds,
replace_existing=True
)
@app.route('/api/monitor/tasks/<int:task_id>/run', methods=['POST'])
@login_required
def run_task_now(task_id):
"""立即执行监控任务"""
execute_monitor_task(task_id)
return jsonify({'success': True, 'message': '任务已执行'})
# 初始化标记(模块级变量,确保只执行一次)
_scheduler_loaded = False
@app.before_request
def before_first_request():
"""应用启动时初始化"""
global _scheduler_loaded
if not _scheduler_loaded:
init_db()
load_scheduler_jobs()
_scheduler_loaded = True
if __name__ == '__main__':
init_db()
load_scheduler_jobs()
app.run(debug=True, host='0.0.0.0', port=5000, use_reloader=False)
|
-
免费评分
-
查看全部评分
|