吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 1401|回复: 30
收起左侧

[Python 原创] MySQL 数据库实时监控系统

  [复制链接]
phantomxjc 发表于 2026-6-4 10:24

【Python实战】自己动手做一个 MySQL 数据库实时监控系统,附部分源码

前言

本文分享一个我在实际工作中开发并长期使用的运维小工具——不动产共享监控系统。项目用 Python + Flask 写的,适合有数据库运维需求的同学,也适合想学 Flask 全栈的新人练手。代码量不大,逻辑清晰,直接上效果图。


一、先看效果

1.1 监控大屏(首页)

5c649005-4157-45d1-af29-f212ddae82cb.png

整体采用浅色主题,蓝色点缀,布局如下:

  • 左侧:实例导航树,可按数据库实例 → 数据库 → 表三级展开筛选
  • 顶部卡片:实时展示 监控任务总数 / 今日成功次数 / 今日异常次数 / 最近同步时间
  • 中部柱状图:今日各表新增记录数,自动按数据库分组着色,支持自定义排序
  • 中部折线图:支持选择任意监控任务,查看近7天的记录数趋势
  • 底部表格:今日所有监控结果明细,含执行状态、记录数、执行时间等字段

1.2 后台管理界面

a1c8e510-c655-4bcc-9258-0e05d05dd25b.png

后台分为以下几个管理模块:

Tab 功能
监控任务 创建/编辑/发布/删除监控任务
数据库连接 管理 MySQL 实例连接信息
数据库别名 为数据库/实例设置展示别名
SQL 配置 配置自定义查询 SQL
图表排序 拖拽调整柱状图各表显示顺序

二、主要功能介绍

2.1 多实例、多数据库监控

系统支持同时对多个 MySQL 实例进行监控,每个实例下可以监控多个数据库、多张表。连接信息集中在后台管理,无需改代码。

852c47d2-ec56-456d-b6be-8948facd4a15.png

连接信息字段包括:实例名称 / 别名 / 主机 / 端口 / 用户名 / 密码 / 默认数据库,添加后可立即测试连通性。

2.2 灵活的监控任务配置

每个监控任务绑定一张表,支持以下配置:

  • 监控频率:5分钟 / 15分钟 / 30分钟 / 1小时 / 每天
  • 查询模式:系统自动生成 COUNT(*) 查询,也支持自定义 SQL(比如带 WHERE 条件的统计)
  • 表别名:给业务表取一个友好的展示名称,在大屏上显示
  • 发布控制:任务创建后默认不发布,发布后才会出现在监控大屏

同一张表可以创建多个不同条件的监控任务,互不干扰。
edbabf9e-b6d0-47f5-b2d0-2f8fe3a49bbc.png

2.3 今日数据可视化

柱状图支持按数据库自动分组着色,8种颜色循环分配,相同数据库下的表使用同色系,视觉上一眼能区分数据来源。

  • 异常任务(status=error)显示为红色
  • 今日未运行的任务(pending)显示为浅灰色
  • 图表下方附有颜色图例,标注每种颜色对应的数据库名

折线图支持从下拉框选择任意已发布任务,查看近7天该表的记录数历史走势,帮助判断数据流入是否正常。

2.4 图表排序管理

在后台管理的「图表排序」Tab 中,可以调整柱状图各表的显示顺序:

  • 上移 / 下移:逐步微调位置
  • 置顶:一键把某张表移到第一位
  • 保存后前台立即生效

2.5 最近同步状态

首页顶部卡片「最近同步」会实时显示距上次成功同步的时间差,格式如下:

  • 刚刚 — 1分钟内
  • 3分钟前 — 3-59分钟
  • 2小时前 — 1-23小时
  • 1天前 — 超过1天

2.6 自动定时调度

系统基于 APScheduler 实现定时任务调度,所有已发布任务按配置的频率自动执行,无需手动触发。执行结果(成功/失败/记录数/错误信息)自动写入数据库,历史记录完整保留。


三、技术栈

层次 技术
后端框架 Python 3.12 + Flask 3.x
任务调度 Flask-APScheduler
数据存储 SQLite3(本地文件,零依赖)
目标数据库 MySQL / MariaDB(通过 PyMySQL 连接)
前端图表 ECharts 5.4.3
前端样式 原生 CSS 变量 + 自研组件,无前端框架依赖
认证 Session + SHA256 密码哈希

整个项目无需前端构建工具,模板直接用 Jinja2 渲染,部署只需要一个 python app.py,非常适合内网环境。


四、项目结构

db_monitor/
├── app.py              # 主程序(路由 + API + 调度器)
├── monitor.db          # SQLite 数据库(自动创建)
├── requirements.txt    # 依赖列表
└── templates/
    ├── base.html       # 公共布局模板(侧边栏 + 头部 + CSS变量)
    ├── login.html      # 登录页
    ├── index.html      # 监控大屏(首页)
    └── admin.html      # 后台管理界面

代码量:app.py 约 1400 行,4个模板文件合计约 3000 行,逻辑清晰分层。


五、数据库设计

系统使用 SQLite,共 5 张表:

-- 用户表
users (id, username, password, created_at)

-- MySQL 实例连接配置
db_connections (id, name, alias, host, port, username, password, database_name, status)

-- 数据库别名(实例 → 数据库 两级别名)
db_aliases (id, connection_id, db_name, alias)

-- 监控任务
monitor_tasks (id, name, db_connection_id, database_name, table_name,
               table_alias, check_condition, sql_query, frequency,
               status, is_published, sort_order, last_run_at, last_result)

-- 监控执行结果(历史记录)
monitor_results (id, task_id, record_count, status, error_message, executed_at)

数据库会在首次启动时自动创建,并自动执行字段迁移(ALTER TABLE ADD COLUMN IF NOT EXISTS),升级无需手动操作。


六、部署方法

6.1 环境要求

  • Python 3.9+
  • pip 安装以下依赖:
flask
flask-apscheduler
pymysql

一键安装:

pip install -r requirements.txt

6.2 启动服务

cd db_monitor
python app.py

默认监听 0.0.0.0:5000,浏览器访问 http://localhost:5000 即可。

默认账号密码:admin / admin123(进后台后请及时修改)

6.3 首次配置步骤

  1. 登录后台 → 数据库连接 → 新增一个 MySQL 实例,测试连通性
  2. 进入 监控任务 → 新建任务,选择目标表和监控频率
  3. 任务默认为「未发布」状态,点击发布后才会出现在监控大屏
  4. 可在 数据库别名 为实例和数据库设置展示友好的中文名称
  5. 图表排序 中调整柱状图各表显示顺序

七、几个细节说明

为什么用 SQLite 而不是 MySQL?

监控系统本身的数据量极小(任务配置 + 结果记录),SQLite 文件数据库完全够用,并且:

  • 无需额外安装数据库服务
  • 数据文件可以直接备份复制
  • 内网部署简单,无端口暴露风险

为什么自研 CSS,不用 Bootstrap?

项目内网使用,要求无外网依赖,所有资源都通过 CDN 的 ECharts 是唯一例外(也可以改成本地引入)。纯原生 CSS 变量实现主题体系,改一个颜色变量全局生效。

APScheduler 的坑

Flask 开发模式下默认开启 use_reloader=True,会启动两个进程,导致定时任务跑两遍。建议生产部署时用:

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=False, use_reloader=False)

八、后续计划

  • [ ] 支持邮件/企微告警(记录数异常时推送通知)
  • [ ] 新增数据量趋势预警(基于历史均值自动判断异常)
  • [ ] 支持 PostgreSQL / Oracle 等其他数据库类型
  • [ ] 增加数据导出功能(Excel / CSV)
  • [ ] 支持多用户权限管理

九、写在最后

这个工具从最初的"临时脚本"一路迭代成了现在的完整系统,主要解决的痛点就是:在多个 MySQL 实例之间,实时知道业务数据有没有在正常流入

适合场景:数据同步任务监控 / ETL 管道状态观察 / 接口数据写入验证 / 日常数据质量检查。

源码已整理,如有需要可以回复帖子获取。

有问题欢迎在评论区留言,看到必回 🙌




[Python] 纯文本查看 复制代码
"""
MySQL数据库监控系统
Flask + SQLite + APScheduler
"""

from flask import Flask, render_template, request, jsonify, session, redirect, url_for, flash
from flask_apscheduler import APScheduler
from datetime import datetime, timedelta
import sqlite3
import json
import hashlib
import pymysql
import threading
import time
from functools import wraps

def now_local():
    """返回本地时间字符串(北京时间),格式 YYYY-MM-DD HH:MM:SS"""
    return datetime.now().strftime('%Y-%m-%d %H:%M:%S')

def today_local():
    """返回本地今日日期字符串(北京时间),格式 YYYY-MM-DD"""
    return datetime.now().strftime('%Y-%m-%d')

app = Flask(__name__)
app.secret_key = 'db_monitor_secret_key_2024'

# 定时任务配置
class Config:
    SCHEDULER_API_ENABLED = True
    SCHEDULER_TIMEZONE = 'Asia/Shanghai'

app.config.from_object(Config)
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()

DATABASE = 'monitor.db'

def get_db():
    """获取数据库连接"""
    conn = sqlite3.connect(DATABASE)
    conn.row_factory = sqlite3.Row
    return conn

import contextlib

@contextlib.contextmanager
def get_db_ctx():
    """数据库连接上下文管理器,自动关闭连接"""
    conn = get_db()
    try:
        yield conn
    finally:
        conn.close()

def init_db():
    """初始化数据库表"""
    conn = get_db()
    cursor = conn.cursor()

    # 用户表
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            username TEXT UNIQUE NOT NULL,
            password TEXT NOT NULL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')

    # 数据库连接配置表
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS db_connections (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            alias TEXT,
            db_type TEXT DEFAULT 'mysql',
            host TEXT NOT NULL,
            port INTEGER DEFAULT 3306,
            username TEXT NOT NULL,
            password TEXT NOT NULL,
            database_name TEXT,
            status INTEGER DEFAULT 1,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')

    # 监控任务表
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS monitor_tasks (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            db_connection_id INTEGER NOT NULL,
            database_name TEXT NOT NULL,
            table_name TEXT NOT NULL,
            table_alias TEXT,
            check_condition TEXT,
            sql_query TEXT,
            frequency TEXT DEFAULT '5min',
            status INTEGER DEFAULT 1,
            last_run_at TIMESTAMP,
            last_result INTEGER DEFAULT 0,
            last_error TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            FOREIGN KEY (db_connection_id) REFERENCES db_connections(id)
        )
    ''')

    # 监控结果表
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS monitor_results (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            task_id INTEGER NOT NULL,
            record_count INTEGER DEFAULT 0,
            status TEXT DEFAULT 'success',
            error_message TEXT,
            executed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            FOREIGN KEY (task_id) REFERENCES monitor_tasks(id)
        )
    ''')

    # 兼容旧数据库:添加必要字段
    cursor.execute("PRAGMA table_info(monitor_tasks)")
    columns = [col[1] for col in cursor.fetchall()]
    if 'sql_query' not in columns:
        cursor.execute('ALTER TABLE monitor_tasks ADD COLUMN sql_query TEXT')

    cursor.execute("PRAGMA table_info(monitor_tasks)")
    col_info = {col[1]: col for col in cursor.fetchall()}
    if 'last_result' not in col_info:
        cursor.execute('ALTER TABLE monitor_tasks ADD COLUMN last_result INTEGER DEFAULT 0')
    if 'is_published' not in col_info:
        cursor.execute('ALTER TABLE monitor_tasks ADD COLUMN is_published INTEGER DEFAULT 0')
    if 'table_alias' not in col_info:
        cursor.execute('ALTER TABLE monitor_tasks ADD COLUMN table_alias TEXT')
    if 'sort_order' not in col_info:
        cursor.execute('ALTER TABLE monitor_tasks ADD COLUMN sort_order INTEGER DEFAULT 0')

    # 兼容旧 db_connections:添加 alias 字段
    cursor.execute("PRAGMA table_info(db_connections)")
    conn_col_info = {col[1]: col for col in cursor.fetchall()}
    if 'alias' not in conn_col_info:
        cursor.execute('ALTER TABLE db_connections ADD COLUMN alias TEXT')

    # 数据库别名表(实例下的数据库级别别名)
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS db_aliases (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            connection_id INTEGER NOT NULL,
            db_name TEXT NOT NULL,
            alias TEXT DEFAULT '',
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            FOREIGN KEY (connection_id) REFERENCES db_connections(id),
            UNIQUE(connection_id, db_name)
        )
    ''')

    # 创建默认管理员账号
    default_pass = hashlib.sha256('admin123'.encode()).hexdigest()
    cursor.execute('''
        INSERT OR IGNORE INTO users (username, password)
        VALUES (?, ?)
    ''', ('admin', default_pass))

    conn.commit()
    conn.close()

def login_required(f):
    """登录验证装饰器"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if 'user_id' not in session:
            if request.is_json:
                return jsonify({'success': False, 'message': '请先登录'}), 401
            return redirect(url_for('login'))
        return f(*args, **kwargs)
    return decorated_function

# ==================== 页面路由 ====================

@app.route('/')
def index():
    """主界面 - 监控结果展示"""
    return render_template('index.html')

@app.route('/login', methods=['GET', 'POST'])
def login():
    """登录页面"""
    if request.method == 'POST':
        username = request.form.get('username')
        password = request.form.get('password')

        conn = get_db()
        cursor = conn.cursor()
        hashed_pass = hashlib.sha256(password.encode()).hexdigest()

        user = cursor.execute(
            'SELECT * FROM users WHERE username = ? AND password = ?',
            (username, hashed_pass)
        ).fetchone()
        conn.close()

        if user:
            session['user_id'] = user['id']
            session['username'] = user['username']
            return redirect(url_for('admin'))
        else:
            flash('用户名或密码错误', 'error')

    return render_template('login.html')

@app.route('/logout')
def logout():
    """退出登录"""
    session.clear()
    return redirect(url_for('login'))

@app.route('/admin')
@login_required
def admin():
    """后台管理界面"""
    return render_template('admin.html')

# ==================== API接口 ====================

@app.route('/api/monitor/results')
def get_monitor_results():
    """获取监控结果列表"""
    with get_db_ctx() as conn:
        cursor = conn.cursor()

        # 预加载数据库别名
        cursor.execute('SELECT connection_id, db_name, alias FROM db_aliases')
        db_alias_map = {}
        for r in cursor.fetchall():
            db_alias_map[(r['connection_id'], r['db_name'])] = r['alias'] or ''

        # 获取今天的结果(只显示已发布且未删除的任务)
        cursor.execute('''
            SELECT
                mr.id,
                mt.id as task_id,
                mt.db_connection_id,
                dc.name as instance_name,
                CASE WHEN dc.alias IS NOT NULL AND dc.alias != '' THEN dc.alias ELSE dc.name END as instance_display,
                mt.database_name,
                mt.table_name,
                CASE WHEN mt.table_alias IS NOT NULL AND mt.table_alias != '' THEN mt.table_alias ELSE mt.table_name END as table_display,
                mt.table_alias,
                dc.alias as instance_alias,
                mr.record_count,
                mr.status,
                mr.error_message,
                mr.executed_at,
                mt.sql_query
            FROM monitor_results mr
            JOIN monitor_tasks mt ON mr.task_id = mt.id
            JOIN db_connections dc ON mt.db_connection_id = dc.id
            WHERE substr(mr.executed_at, 1, 10) = ?
            AND mt.status = 1
            AND mt.is_published = 1
            ORDER BY mr.executed_at DESC
            LIMIT 200
        ''', (today_local(),))

        results = []
        for row in cursor.fetchall():
            executed_at = row['executed_at']
            # 查找数据库别名
            db_alias = db_alias_map.get((row['db_connection_id'], row['database_name']), '')
            results.append({
                'id': row['id'],
                'task_id': row['task_id'],
                'instance_name': row['instance_name'],
                'instance_display': row['instance_display'],
                'instance_alias': row['instance_alias'] or '',
                'database_name': row['database_name'],
                'database_display': db_alias if db_alias else row['database_name'],
                'database_alias': db_alias,
                'table_name': row['table_name'],
                'table_display': row['table_display'],
                'table_alias': row['table_alias'] or '',
                'record_count': row['record_count'] if row['record_count'] is not None else 0,
                'status': row['status'],
                'error_message': row['error_message'],
                'executed_at': executed_at,
                'sql_query': row['sql_query']
            })

    return jsonify({'success': True, 'data': results})

@app.route('/api/monitor/instances')
def get_monitor_instances():
    """获取监控实例列表(用于左侧导航树)"""
    with get_db_ctx() as conn:
        cursor = conn.cursor()

        # 后台管理:显示所有任务(包括未发布的)
        include_unpublished = request.args.get('include_unpublished', '0') == '1'

        # 预加载数据库别名
        cursor.execute('SELECT connection_id, db_name, alias FROM db_aliases')
        db_alias_map = {}
        for r in cursor.fetchall():
            db_alias_map[(r['connection_id'], r['db_name'])] = r['alias'] or ''

        if include_unpublished:
            # 后台管理 - 显示所有任务
            cursor.execute('''
                SELECT DISTINCT
                    dc.id as connection_id,
                    dc.name as instance_name,
                    CASE WHEN dc.alias IS NOT NULL AND dc.alias != '' THEN dc.alias ELSE dc.name END as instance_display,
                    dc.alias as instance_alias,
                    mt.database_name,
                    mt.table_name,
                    CASE WHEN mt.table_alias IS NOT NULL AND mt.table_alias != '' THEN mt.table_alias ELSE mt.table_name END as table_display,
                    mt.table_alias,
                    mt.id as task_id,
                    mt.status as task_status,
                    mt.is_published,
                    mt.last_result,
                    mt.last_run_at,
                    mt.sql_query
                FROM db_connections dc
                LEFT JOIN monitor_tasks mt ON dc.id = mt.db_connection_id AND mt.status = 1
                WHERE dc.status = 1
                ORDER BY dc.name, mt.database_name, mt.table_name
            ''')
        else:
            # 监控面板 - 只显示已发布任务
            cursor.execute('''
                SELECT DISTINCT
                    dc.id as connection_id,
                    dc.name as instance_name,
                    CASE WHEN dc.alias IS NOT NULL AND dc.alias != '' THEN dc.alias ELSE dc.name END as instance_display,
                    dc.alias as instance_alias,
                    mt.database_name,
                    mt.table_name,
                    CASE WHEN mt.table_alias IS NOT NULL AND mt.table_alias != '' THEN mt.table_alias ELSE mt.table_name END as table_display,
                    mt.table_alias,
                    mt.id as task_id,
                    mt.status as task_status,
                    mt.is_published,
                    mt.last_result,
                    mt.last_run_at,
                    mt.sql_query
                FROM db_connections dc
                LEFT JOIN monitor_tasks mt ON dc.id = mt.db_connection_id AND mt.status = 1 AND mt.is_published = 1
                WHERE dc.status = 1
                ORDER BY dc.name, mt.database_name, mt.table_name
            ''')

        rows = cursor.fetchall()

        instances = {}
        for row in rows:
            inst_name = row['instance_name']
            db_name = row['database_name']

            if inst_name not in instances:
                instances[inst_name] = {
                    'id': row['connection_id'],
                    'name': inst_name,
                    'display': row['instance_display'],
                    'alias': row['instance_alias'] or '',
                    'databases': {}
                }

            if db_name and db_name not in instances[inst_name]['databases']:
                db_alias = db_alias_map.get((row['connection_id'], db_name), '')
                instances[inst_name]['databases'][db_name] = {
                    'name': db_name,
                    'alias': db_alias,
                    'display': db_alias if db_alias else db_name,
                    'tables': []
                }

            if db_name and row['table_name']:
                instances[inst_name]['databases'][db_name]['tables'].append({
                    'task_id': row['task_id'],
                    'name': row['table_name'],
                    'display': row['table_display'],
                    'alias': row['table_alias'] or '',
                    'status': row['task_status'],
                    'is_published': row['is_published'] if row['is_published'] is not None else 0,
                    'last_result': row['last_result'] if row['last_result'] is not None else 0,
                    'last_run_at': row['last_run_at'],
                    'sql_query': row['sql_query']
                })

    return jsonify({'success': True, 'data': list(instances.values())})

@app.route('/api/monitor/summary')
def get_monitor_summary():
    """获取监控汇总统计"""
    with get_db_ctx() as conn:
        cursor = conn.cursor()

        # 统计有已发布监控任务的数据库连接数(实例数)
        cursor.execute('''
            SELECT COUNT(DISTINCT db_connection_id) as instance_count
            FROM monitor_tasks WHERE status = 1 AND is_published = 1
        ''')
        instance_stats = cursor.fetchone()

        cursor.execute('''
            SELECT
                COUNT(*) as total_tasks,
                SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) as active_tasks,
                SUM(CASE WHEN last_result > 0 THEN 1 ELSE 0 END) as has_data_tasks
            FROM monitor_tasks
            WHERE status = 1
        ''')
        task_stats = cursor.fetchone()

        cursor.execute('''
            SELECT
                COUNT(*) as today_records,
                SUM(mr.record_count) as total_records
            FROM monitor_results mr
            JOIN monitor_tasks mt ON mr.task_id = mt.id
            WHERE substr(mr.executed_at, 1, 10) = ?
            AND mt.status = 1
            AND mt.is_published = 1
        ''', (today_local(),))
        today_stats = cursor.fetchone()

        # 最近一次成功同步时间
        cursor.execute('''
            SELECT mr.executed_at
            FROM monitor_results mr
            JOIN monitor_tasks mt ON mr.task_id = mt.id
            WHERE mr.status = 'success'
            AND mt.status = 1
            AND mt.is_published = 1
            ORDER BY mr.executed_at DESC
            LIMIT 1
        ''')
        last_sync_row = cursor.fetchone()
        last_sync_at = last_sync_row['executed_at'] if last_sync_row else None

        cursor.execute('''
            SELECT COUNT(*) as error_count
            FROM monitor_results mr
            JOIN monitor_tasks mt ON mr.task_id = mt.id
            WHERE mr.status = 'error' AND substr(mr.executed_at, 1, 10) = ?
            AND mt.status = 1
            AND mt.is_published = 1
        ''', (today_local(),))
        error_stats = cursor.fetchone()

    return jsonify({
        'success': True,
        'data': {
            'total_instances': instance_stats['instance_count'] or 0,
            'total_tasks': task_stats['total_tasks'] or 0,
            'active_tasks': task_stats['active_tasks'] or 0,
            'has_data_tasks': task_stats['has_data_tasks'] or 0,
            'today_records': today_stats['today_records'] or 0,
            'today_total_records': today_stats['total_records'] or 0,
            'today_errors': error_stats['error_count'] or 0,
            'last_sync_at': last_sync_at
        }
    })

@app.route('/api/monitor/chart-data')
def get_chart_data():
    """获取图表数据:近N天趋势 + 今日柱状图"""
    # 支持 days=7 或 days=30
    try:
        n_days = int(request.args.get('days', 7))
    except (TypeError, ValueError):
        n_days = 7
    n_days = max(7, min(30, n_days))

    # 支持按 task_id 过滤趋势图
    task_id_filter = request.args.get('task_id')

    days = [(datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d') for i in range(n_days - 1, -1, -1)]

    with get_db_ctx() as conn:
        cursor = conn.cursor()

        # 预加载数据库别名(用于图表 label)
        cursor.execute('SELECT connection_id, db_name, alias FROM db_aliases')
        db_alias_map_chart = {}
        for r in cursor.fetchall():
            db_alias_map_chart[(r['connection_id'], r['db_name'])] = r['alias'] or ''

        # 构建趋势查询(支持 task_id 过滤)
        trend_where = "WHERE substr(mr.executed_at, 1, 10) >= ? AND mt.status = 1 AND mt.is_published = 1"
        trend_params = [days[0]]
        if task_id_filter:
            trend_where += " AND mt.id = ?"
            trend_params.append(int(task_id_filter))

        cursor.execute(f"""
            SELECT substr(mr.executed_at, 1, 10) as date,
                   mt.id as task_id,
                   mt.database_name,
                   mt.table_name,
                   CASE WHEN mt.table_alias IS NOT NULL AND mt.table_alias != '' THEN mt.table_alias ELSE mt.table_name END as table_display,
                   dc.id as connection_id,
                   dc.name as instance_name,
                   CASE WHEN dc.alias IS NOT NULL AND dc.alias != '' THEN dc.alias ELSE dc.name END as instance_display,
                   COALESCE(SUM(mr.record_count), 0) as total_count
            FROM monitor_results mr
            JOIN monitor_tasks mt ON mr.task_id = mt.id
            JOIN db_connections dc ON mt.db_connection_id = dc.id
            {trend_where}
            GROUP BY substr(mr.executed_at, 1, 10), mt.id
            ORDER BY date ASC
        """, trend_params)
        rows = cursor.fetchall()
        table_map = {}
        for row in rows:
            key = str(row['task_id'])
            db_alias = db_alias_map_chart.get((row['connection_id'], row['database_name']), '')
            db_display = db_alias if db_alias else row['database_name']
            label = db_display + ' · ' + row['table_display']
            if key not in table_map:
                table_map[key] = {'label': label, 'data': {d: 0 for d in days}}
            if row['date'] in table_map[key]['data']:
                table_map[key]['data'][row['date']] = int(row['total_count'] or 0)

        # 今日柱状图:以全部已发布任务为基础,LEFT JOIN 今日最新结果
        # 确保今日还没跑的任务也显示(count=0),不遗漏任何任务
        cursor.execute("""
            SELECT mt.id as task_id, mt.table_name, mt.sort_order,
                   mt.database_name,
                   CASE WHEN mt.table_alias IS NOT NULL AND mt.table_alias != '' THEN mt.table_alias ELSE mt.table_name END as table_display,
                   dc.id as connection_id,
                   dc.name as instance_name,
                   CASE WHEN dc.alias IS NOT NULL AND dc.alias != '' THEN dc.alias ELSE dc.name END as instance_display,
                   latest.record_count, latest.status, latest.executed_at
            FROM monitor_tasks mt
            JOIN db_connections dc ON mt.db_connection_id = dc.id
            LEFT JOIN (
                SELECT mr.task_id,
                       mr.record_count,
                       mr.status,
                       mr.executed_at
                FROM monitor_results mr
                INNER JOIN (
                    SELECT task_id, MAX(executed_at) as max_at
                    FROM monitor_results
                    WHERE substr(executed_at, 1, 10) = ?
                    GROUP BY task_id
                ) sub ON mr.task_id = sub.task_id AND mr.executed_at = sub.max_at
            ) latest ON latest.task_id = mt.id
            WHERE mt.status = 1
            AND mt.is_published = 1
            ORDER BY mt.sort_order ASC, mt.id ASC
        """, (today_local(),))
        today_rows = cursor.fetchall()
        seen = set()
        today_bars = []
        all_tasks = []  # 用于前端构建下拉列表
        for row in today_rows:
            key = str(row['task_id'])
            if key not in seen:
                seen.add(key)
                db_alias_bar = db_alias_map_chart.get((row['connection_id'], row['database_name']), '')
                db_display_bar = db_alias_bar if db_alias_bar else row['database_name']
                today_bars.append({
                    'task_id': row['task_id'],
                    'name': row['table_display'],
                    'raw_name': row['table_name'],
                    'db_display': db_display_bar,
                    'instance': row['instance_display'],
                    'sort_order': row['sort_order'] if row['sort_order'] is not None else 0,
                    'count': int(row['record_count'] or 0),
                    # LEFT JOIN 时 status/time 可能为 None(任务今日未运行)
                    'status': row['status'] if row['status'] else 'pending',
                    'time': row['executed_at'] if row['executed_at'] else None
                })

        # 获取所有已发布任务(用于下拉选择)
        cursor.execute("""
            SELECT mt.id, mt.database_name, mt.table_name,
                   CASE WHEN mt.table_alias IS NOT NULL AND mt.table_alias != '' THEN mt.table_alias ELSE mt.table_name END as table_display,
                   dc.id as connection_id,
                   CASE WHEN dc.alias IS NOT NULL AND dc.alias != '' THEN dc.alias ELSE dc.name END as instance_display
            FROM monitor_tasks mt
            JOIN db_connections dc ON mt.db_connection_id = dc.id
            WHERE mt.status = 1 AND mt.is_published = 1
            ORDER BY dc.name, mt.database_name, mt.table_name
        """)
        for row in cursor.fetchall():
            db_alias = db_alias_map_chart.get((row['connection_id'], row['database_name']), '')
            db_display = db_alias if db_alias else row['database_name']
            all_tasks.append({
                'task_id': row['id'],
                'label': db_display + ' · ' + row['table_display']
            })

    series = [{'name': v['label'], 'task_id': k, 'data': [v['data'][d] for d in days]} for k, v in table_map.items()]
    return jsonify({'success': True, 'data': {
        'days': [d[5:] for d in days],
        'series': series,
        'today_bars': today_bars,
        'all_tasks': all_tasks
    }})

# 数据库连接管理API
@app.route('/api/db/connections', methods=['GET', 'POST'])
@login_required
def db_connections():
    """数据库连接管理"""
    if request.method == 'GET':
        with get_db_ctx() as conn:
            cursor = conn.cursor()
            cursor.execute('SELECT * FROM db_connections WHERE status = 1 ORDER BY id DESC')
            connections = []
            for row in cursor.fetchall():
                connections.append({
                    'id': row['id'],
                    'name': row['name'],
                    'alias': row['alias'] or '',
                    'display': row['alias'] if row['alias'] else row['name'],
                    'db_type': row['db_type'],
                    'host': row['host'],
                    'port': row['port'],
                    'username': row['username'],
                    'database_name': row['database_name'],
                    'status': row['status']
                })
        return jsonify({'success': True, 'data': connections})

    elif request.method == 'POST':
        data = request.json
        with get_db_ctx() as conn:
            cursor = conn.cursor()
            cursor.execute('''
                INSERT INTO db_connections (name, alias, db_type, host, port, username, password, database_name)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                data.get('name'),
                data.get('alias', ''),
                data.get('db_type', 'mysql'),
                data.get('host'),
                data.get('port', 3306),
                data.get('username'),
                data.get('password'),
                data.get('database_name')
            ))
            conn.commit()
        return jsonify({'success': True, 'message': '数据库连接添加成功'})

@app.route('/api/db/connections/<int:conn_id>', methods=['PUT', 'DELETE'])
@login_required
def db_connection_detail(conn_id):
    """单个数据库连接管理"""
    if request.method == 'PUT':
        data = request.json

        # 快捷别名更新:只传 alias 字段时,只更新 alias
        if 'alias' in data and 'host' not in data:
            with get_db_ctx() as conn:
                cursor = conn.cursor()
                cursor.execute('UPDATE db_connections SET alias = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?',
                               (data.get('alias', ''), conn_id))
                conn.commit()
            return jsonify({'success': True, 'message': '别名更新成功'})

        with get_db_ctx() as conn:
            cursor = conn.cursor()
            if data.get('password'):
                cursor.execute('''
                    UPDATE db_connections
                    SET name = ?, alias = ?, host = ?, port = ?, username = ?, password = ?, database_name = ?, updated_at = CURRENT_TIMESTAMP
                    WHERE id = ?
                ''', (
                    data.get('name'),
                    data.get('alias', ''),
                    data.get('host'),
                    data.get('port'),
                    data.get('username'),
                    data.get('password'),
                    data.get('database_name'),
                    conn_id
                ))
            else:
                cursor.execute('''
                    UPDATE db_connections
                    SET name = ?, alias = ?, host = ?, port = ?, username = ?, database_name = ?, updated_at = CURRENT_TIMESTAMP
                    WHERE id = ?
                ''', (
                    data.get('name'),
                    data.get('alias', ''),
                    data.get('host'),
                    data.get('port'),
                    data.get('username'),
                    data.get('database_name'),
                    conn_id
                ))
            conn.commit()
        return jsonify({'success': True, 'message': '数据库连接更新成功'})

    elif request.method == 'DELETE':
        with get_db_ctx() as conn:
            cursor = conn.cursor()
            cursor.execute('UPDATE db_connections SET status = 0 WHERE id = ?', (conn_id,))
            conn.commit()
        return jsonify({'success': True, 'message': '数据库连接已删除'})

@app.route('/api/db/database-alias', methods=['GET', 'PUT'])
@login_required
def database_alias():
    """数据库别名管理(实例下的数据库级别别名)"""
    if request.method == 'GET':
        conn_id = request.args.get('connection_id', type=int)
        db_name = request.args.get('db_name', '')
        with get_db_ctx() as conn:
            cursor = conn.cursor()
            if conn_id and db_name:
                cursor.execute('SELECT alias FROM db_aliases WHERE connection_id = ? AND db_name = ?', (conn_id, db_name))
                row = cursor.fetchone()
                return jsonify({'success': True, 'alias': row['alias'] if row else ''})
            else:
                cursor.execute('SELECT connection_id, db_name, alias FROM db_aliases')
                aliases = [{'connection_id': r['connection_id'], 'db_name': r['db_name'], 'alias': r['alias'] or ''} for r in cursor.fetchall()]
                return jsonify({'success': True, 'data': aliases})
    elif request.method == 'PUT':
        data = request.json
        conn_id = data.get('connection_id')
        db_name = data.get('db_name')
        alias = data.get('alias', '')
        if not conn_id or not db_name:
            return jsonify({'success': False, 'message': '缺少 connection_id 或 db_name'})
        with get_db_ctx() as conn:
            cursor = conn.cursor()
            cursor.execute('''
                INSERT INTO db_aliases (connection_id, db_name, alias, updated_at)
                VALUES (?, ?, ?, CURRENT_TIMESTAMP)
                ON CONFLICT(connection_id, db_name) DO UPDATE SET alias = ?, updated_at = CURRENT_TIMESTAMP
            ''', (conn_id, db_name, alias, alias))
            conn.commit()
        return jsonify({'success': True, 'message': '数据库别名更新成功'})

@app.route('/api/db/test', methods=['POST'])
@login_required
def test_db_connection():
    """测试数据库连接"""
    data = request.json

    try:
        if data.get('db_type') in ('mysql', None, ''):
            connection = pymysql.connect(
                host=data.get('host'),
                port=int(data.get('port', 3306)),
                user=data.get('username'),
                password=data.get('password'),
                database=data.get('database_name') or None,
                connect_timeout=5
            )
            connection.close()
            return jsonify({'success': True, 'message': '连接成功'})
        else:
            return jsonify({'success': False, 'message': '暂不支持该数据库类型'})
    except Exception as e:
        return jsonify({'success': False, 'message': f'连接失败: {str(e)}'})

@app.route('/api/db/execute_sql', methods=['POST'])
@login_required
def execute_sql():
    """执行SQL查询(仅SELECT),返回 Dict 格式数据"""
    data = request.json
    conn_id = data.get('connection_id')
    sql = data.get('sql', '').strip()
    database_name = data.get('database_name')

    if not sql:
        return jsonify({'success': False, 'message': 'SQL语句不能为空'})
    if not sql.upper().startswith('SELECT'):
        return jsonify({'success': False, 'message': '仅支持 SELECT 查询语句'})

    with get_db_ctx() as conn:
        cursor = conn.cursor()
        cursor.execute('SELECT * FROM db_connections WHERE id = ? AND status = 1', (conn_id,))
        db_conn = cursor.fetchone()

    if not db_conn:
        return jsonify({'success': False, 'message': '数据库连接不存在或已停用'})

    try:
        connection = pymysql.connect(
            host=db_conn['host'],
            port=db_conn['port'],
            user=db_conn['username'],
            password=db_conn['password'],
            database=database_name or None,
            connect_timeout=5,
            charset='utf8mb4'
        )
        # 使用 DictCursor,返回 dict 格式的 rows
        from pymysql.cursors import DictCursor
        with connection.cursor(DictCursor) as cursor:
            cursor.execute(sql)
            columns = list(cursor.description) if cursor.description else []
            col_names = [c[0] for c in columns]
            rows = cursor.fetchall()  # 每行都是 dict
            # 限制返回条数
            rows = rows[:100]
            row_count = len(rows)
            has_more = len(rows) == 100
        connection.close()
        return jsonify({
            'success': True,
            'message': f'查询成功,返回 {row_count} 条记录' + ('(已截断,仅显示前100条)' if has_more else ''),
            'data': {'columns': col_names, 'rows': rows, 'row_count': row_count}
        })
    except Exception as e:
        return jsonify({'success': False, 'message': f'SQL执行失败: {str(e)}'})

@app.route('/api/db/databases', methods=['POST'])
@login_required
def get_databases():
    """获取数据库列表"""
    data = request.json
    conn_id = data.get('connection_id')

    with get_db_ctx() as conn:
        cursor = conn.cursor()
        cursor.execute('SELECT * FROM db_connections WHERE id = ?', (conn_id,))
        db_conn = cursor.fetchone()

    if not db_conn:
        return jsonify({'success': False, 'message': '数据库连接不存在'})

    try:
        connection = pymysql.connect(
            host=db_conn['host'],
            port=db_conn['port'],
            user=db_conn['username'],
            password=db_conn['password'],
            connect_timeout=5
        )

        with connection.cursor() as cursor:
            cursor.execute('SHOW DATABASES')
            databases = [row[0] for row in cursor.fetchall()]
            databases = [db for db in databases if db not in ['information_schema', 'mysql', 'performance_schema', 'sys']]

        connection.close()
        return jsonify({'success': True, 'data': databases})
    except Exception as e:
        return jsonify({'success': False, 'message': f'获取数据库列表失败: {str(e)}'})

@app.route('/api/db/tables', methods=['POST'])
@login_required
def get_tables():
    """获取表列表"""
    data = request.json
    conn_id = data.get('connection_id')
    database_name = data.get('database_name')

    with get_db_ctx() as conn:
        cursor = conn.cursor()
        cursor.execute('SELECT * FROM db_connections WHERE id = ?', (conn_id,))
        db_conn = cursor.fetchone()

    if not db_conn:
        return jsonify({'success': False, 'message': '数据库连接不存在'})

    try:
        connection = pymysql.connect(
            host=db_conn['host'],
            port=db_conn['port'],
            user=db_conn['username'],
            password=db_conn['password'],
            database=database_name,
            connect_timeout=5
        )

        with connection.cursor() as cursor:
            cursor.execute('SHOW TABLES')
            tables = [row[0] for row in cursor.fetchall()]

        connection.close()
        return jsonify({'success': True, 'data': tables})
    except Exception as e:
        return jsonify({'success': False, 'message': f'获取表列表失败: {str(e)}'})

# 监控任务管理API
@app.route('/api/monitor/tasks', methods=['GET', 'POST'])
@login_required
def monitor_tasks():
    """监控任务管理"""
    if request.method == 'GET':
        with get_db_ctx() as conn:
            cursor = conn.cursor()
            cursor.execute('''
                SELECT mt.*, dc.name as db_connection_name
                FROM monitor_tasks mt
                JOIN db_connections dc ON mt.db_connection_id = dc.id
                WHERE mt.status = 1
                ORDER BY dc.name, mt.database_name, mt.table_name
            ''')
            tasks = []
            for row in cursor.fetchall():
                tasks.append({
                    'id': row['id'],
                    'name': row['name'],
                    'db_connection_id': row['db_connection_id'],
                    'db_connection_name': row['db_connection_name'],
                    'database_name': row['database_name'],
                    'table_name': row['table_name'],
                    'table_alias': row['table_alias'] if row['table_alias'] is not None else '',
                    'check_condition': row['check_condition'],
                    'sql_query': row['sql_query'],
                    'frequency': row['frequency'],
                    'status': row['status'],
                    'is_published': row['is_published'] if row['is_published'] is not None else 0,
                    'last_run_at': row['last_run_at'],
                    'last_result': row['last_result'] if row['last_result'] is not None else 0,
                    'last_error': row['last_error'],
                    'sort_order': row['sort_order'] if row['sort_order'] is not None else 0
                })
        return jsonify({'success': True, 'data': tasks})

    elif request.method == 'POST':
        data = request.json
        tasks_data = data.get('tasks', [])

        if not tasks_data:
            return jsonify({'success': False, 'message': '没有要创建的任务'})

        with get_db_ctx() as conn:
            cursor = conn.cursor()

            # 允许同一张表创建多个不同监控任务,直接插入所有任务
            new_tasks = tasks_data

            # 插入新任务
            created_ids = []
            for task in new_tasks:
                cursor.execute('''
                    INSERT INTO monitor_tasks (name, db_connection_id, database_name, table_name, check_condition, sql_query, frequency)
                    VALUES (?, ?, ?, ?, ?, ?, ?)
                ''', (
                    task.get('name'),
                    task.get('db_connection_id'),
                    task.get('database_name'),
                    task.get('table_name'),
                    task.get('check_condition'),
                    task.get('sql_query'),
                    task.get('frequency', '5min')
                ))
                created_ids.append(cursor.lastrowid)

            conn.commit()

        load_scheduler_jobs()

        # 立即执行新建的任务,让监控面板立刻能看到数据
        for task_id in created_ids:
            try:
                execute_monitor_task(task_id)
            except Exception:
                pass  # 忽略执行错误,不影响创建成功的结果

        msg = f'成功创建 {len(new_tasks)} 个监控任务'
        return jsonify({'success': True, 'message': msg})

@app.route('/api/monitor/tasks/<int:task_id>', methods=['GET', 'PUT', 'DELETE'])
@login_required
def monitor_task_detail(task_id):
    """单个监控任务管理"""
    conn = get_db()
    cursor = conn.cursor()

    if request.method == 'GET':
        cursor.execute('''
            SELECT mt.*, dc.name as db_connection_name
            FROM monitor_tasks mt
            JOIN db_connections dc ON mt.db_connection_id = dc.id
            WHERE mt.id = ?
        ''', (task_id,))

        row = cursor.fetchone()
        conn.close()

        if not row:
            return jsonify({'success': False, 'message': '任务不存在'})

        return jsonify({
            'success': True,
            'data': {
                'id': row['id'],
                'name': row['name'],
                'db_connection_id': row['db_connection_id'],
                'db_connection_name': row['db_connection_name'],
                'database_name': row['database_name'],
                'table_name': row['table_name'],
                'check_condition': row['check_condition'],
                'sql_query': row['sql_query'],
                'frequency': row['frequency'],
                'status': row['status'],
                'is_published': row['is_published'] if row['is_published'] is not None else 0,
                'last_run_at': row['last_run_at'],
                'last_result': row['last_result'] if row['last_result'] is not None else 0,
                'last_error': row['last_error']
            }
        })

    elif request.method == 'PUT':
        data = request.json
        cursor.execute('''
            UPDATE monitor_tasks
            SET check_condition = ?, sql_query = ?, frequency = ?, table_alias = ?
            WHERE id = ?
        ''', (
            data.get('check_condition'),
            data.get('sql_query'),
            data.get('frequency'),
            data.get('table_alias', ''),
            task_id
        ))
        conn.commit()
        conn.close()

        load_scheduler_jobs()

        return jsonify({'success': True, 'message': '监控任务更新成功'})

    elif request.method == 'DELETE':
        cursor.execute('UPDATE monitor_tasks SET status = 0 WHERE id = ?', (task_id,))
        conn.commit()
        conn.close()

        load_scheduler_jobs()

        return jsonify({'success': True, 'message': '监控任务已删除'})

@app.route('/api/monitor/tasks/<int:task_id>/publish', methods=['POST'])
@login_required
def publish_task(task_id):
    """发布任务到监控面板"""
    with get_db_ctx() as conn:
        cursor = conn.cursor()
        cursor.execute('UPDATE monitor_tasks SET is_published = 1 WHERE id = ?', (task_id,))
        conn.commit()
    return jsonify({'success': True, 'message': '任务已发布到监控面板'})

@app.route('/api/monitor/tasks/<int:task_id>/unpublish', methods=['POST'])
@login_required
def unpublish_task(task_id):
    """撤销任务(从监控面板移除)"""
    with get_db_ctx() as conn:
        cursor = conn.cursor()
        cursor.execute('UPDATE monitor_tasks SET is_published = 0 WHERE id = ?', (task_id,))
        conn.commit()
    return jsonify({'success': True, 'message': '任务已从监控面板撤销'})

@app.route('/api/monitor/tasks/batch-delete', methods=['POST'])
@login_required
def batch_delete_tasks():
    """批量删除监控任务"""
    data = request.json
    task_ids = data.get('task_ids', [])

    if not task_ids:
        return jsonify({'success': False, 'message': '未选择任务'})

    with get_db_ctx() as conn:
        cursor = conn.cursor()
        placeholders = ','.join(['?' for _ in task_ids])
        cursor.execute(f'UPDATE monitor_tasks SET status = 0 WHERE id IN ({placeholders})', task_ids)
        conn.commit()

    load_scheduler_jobs()

    return jsonify({'success': True, 'message': f'成功删除 {len(task_ids)} 个监控任务'})

@app.route('/api/monitor/tasks/batch-sort', methods=['POST'])
@login_required
def batch_sort_tasks():
    """批量更新任务排序权重"""
    data = request.json
    # [{task_id: 1, sort_order: 0}, {task_id: 2, sort_order: 1}, ...]
    orders = data.get('orders', [])
    if not orders:
        return jsonify({'success': False, 'message': '无排序数据'})
    with get_db_ctx() as conn:
        cursor = conn.cursor()
        for item in orders:
            cursor.execute('UPDATE monitor_tasks SET sort_order = ? WHERE id = ?',
                           (item.get('sort_order', 0), item.get('task_id')))
        conn.commit()
    return jsonify({'success': True, 'message': '排序已保存'})

# ==================== 定时任务执行 ====================

def execute_monitor_task(task_id):
    """执行单个监控任务"""
    conn = get_db()
    cursor = conn.cursor()

    cursor.execute('''
        SELECT mt.*, dc.host, dc.port, dc.username, dc.password
        FROM monitor_tasks mt
        JOIN db_connections dc ON mt.db_connection_id = dc.id
        WHERE mt.id = ? AND mt.status = 1
    ''', (task_id,))

    task = cursor.fetchone()
    if not task:
        conn.close()
        return

    try:
        # 连接数据库
        connection = pymysql.connect(
            host=task['host'],
            port=task['port'],
            user=task['username'],
            password=task['password'],
            database=task['database_name'],
            connect_timeout=10,
            charset='utf8mb4'
        )

        # 构建SQL:如果有自定义SQL直接用;否则用默认模板
        if task['sql_query'] and task['sql_query'].strip():
            sql = task['sql_query']
        else:
            # 默认SQL:统计今日数据
            check_condition = task['check_condition'] or ''
            if check_condition.strip():
                if 'where' in check_condition.lower():
                    sql = f"SELECT COUNT(*) FROM {task['table_name']} {check_condition}"
                else:
                    sql = f"SELECT COUNT(*) FROM {task['table_name']} WHERE {check_condition}"
            else:
                today = datetime.now().strftime('%Y-%m-%d')
                sql = f"SELECT COUNT(*) FROM {task['table_name']} WHERE DATE(created_at) = '{today}'"

        # 执行SQL,使用普通游标
        with connection.cursor() as cur:
            cur.execute(sql)
            result = cur.fetchone()
            # fetchone() 返回 (count,) 元组,result[0] 即为数量
            if result and len(result) > 0:
                count = int(result[0]) if result[0] is not None else 0
            else:
                count = 0

        connection.close()

        local_now = now_local()
        local_today = today_local()

        # 更新任务状态
        cursor.execute('''
            UPDATE monitor_tasks SET last_run_at = ?, last_result = ?, last_error = NULL
            WHERE id = ?
        ''', (local_now, count, task_id))

        # 更新今天的监控结果(如果已有今天的结果),否则插入新记录
        # 先检查今天是否已有该任务的执行记录(用本地日期比较)
        cursor.execute('''
            SELECT id FROM monitor_results
            WHERE task_id = ? AND substr(executed_at, 1, 10) = ?
        ''', (task_id, local_today))
        existing = cursor.fetchone()

        if existing:
            # 更新今天的记录
            cursor.execute('''
                UPDATE monitor_results
                SET record_count = ?, status = 'success', error_message = NULL, executed_at = ?
                WHERE id = ?
            ''', (count, local_now, existing['id']))
        else:
            # 插入新记录
            cursor.execute('''
                INSERT INTO monitor_results (task_id, record_count, status, executed_at)
                VALUES (?, ?, ?, ?)
            ''', (task_id, count, 'success', local_now))

        conn.commit()

    except Exception as e:
        error_msg = str(e)
        local_now = now_local()
        local_today = today_local()

        # 更新任务状态为错误
        cursor.execute('''
            UPDATE monitor_tasks SET last_run_at = ?, last_error = ?
            WHERE id = ?
        ''', (local_now, error_msg, task_id))

        # 更新今天的监控结果(如果已有今天的结果),否则插入新记录
        cursor.execute('''
            SELECT id FROM monitor_results
            WHERE task_id = ? AND substr(executed_at, 1, 10) = ?
        ''', (task_id, local_today))
        existing = cursor.fetchone()

        if existing:
            cursor.execute('''
                UPDATE monitor_results
                SET record_count = 0, status = 'error', error_message = ?, executed_at = ?
                WHERE id = ?
            ''', (error_msg, local_now, existing['id']))
        else:
            cursor.execute('''
                INSERT INTO monitor_results (task_id, record_count, status, error_message, executed_at)
                VALUES (?, ?, ?, ?, ?)
            ''', (task_id, 0, 'error', error_msg, local_now))

        conn.commit()

    finally:
        conn.close()

def load_scheduler_jobs():
    """加载所有监控任务到调度器(只调度已发布的任务)"""
    for job in scheduler.get_jobs():
        if job.id.startswith('monitor_'):
            scheduler.remove_job(job.id)

    with get_db_ctx() as conn:
        cursor = conn.cursor()
        cursor.execute('SELECT id, frequency FROM monitor_tasks WHERE status = 1 AND is_published = 1')
        tasks = cursor.fetchall()

    freq_map = {
        '1min': 60,
        '5min': 300,
        '10min': 600,
        '30min': 1800,
        '1hour': 3600,
        '4hour': 14400
    }

    for task in tasks:
        seconds = freq_map.get(task['frequency'], 300)
        scheduler.add_job(
            id=f"monitor_{task['id']}",
            func=execute_monitor_task,
            args=[task['id']],
            trigger='interval',
            seconds=seconds,
            replace_existing=True
        )

@app.route('/api/monitor/tasks/<int:task_id>/run', methods=['POST'])
@login_required
def run_task_now(task_id):
    """立即执行监控任务"""
    execute_monitor_task(task_id)
    return jsonify({'success': True, 'message': '任务已执行'})

# 初始化标记(模块级变量,确保只执行一次)
_scheduler_loaded = False

@app.before_request
def before_first_request():
    """应用启动时初始化"""
    global _scheduler_loaded
    if not _scheduler_loaded:
        init_db()
        load_scheduler_jobs()
        _scheduler_loaded = True

if __name__ == '__main__':
    init_db()
    load_scheduler_jobs()
    app.run(debug=True, host='0.0.0.0', port=5000, use_reloader=False)


c781baa5-6bcb-4f65-b3d2-4d4f5fe2c29d.png

免费评分

参与人数 6吾爱币 +6 热心值 +4 收起 理由
wapj2900958 + 1 + 1 感谢发布原创作品,吾爱破解论坛因你更精彩!
weidechan + 1 感谢发布原创作品,吾爱破解论坛因你更精彩!
xiaoye2 + 1 + 1 我很赞同!
lxj12328 + 1 + 1 谢谢@Thanks!
zhangdashuaibi + 1 + 1 谢谢@Thanks!
lcg2014 + 1 用心讨论,共获提升!

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

ytfqifw 发表于 2026-6-4 10:33
沙发我座上了,但是图片加载好慢。
lengfeng82 发表于 2026-6-4 10:35
Mzhang2008 发表于 2026-6-4 10:36
lcg2014 发表于 2026-6-4 10:37
看不到截图,支持一下原创,能不能加上监控慢查询和锁?
gugouo163 发表于 2026-6-4 10:39
一直在看图片转圈圈
gugouo163 发表于 2026-6-4 10:41

论坛附件服务器挂了
 楼主| phantomxjc 发表于 2026-6-4 10:53
ytfqifw 发表于 2026-6-4 10:33
沙发我座上了,但是图片加载好慢。

貌似论坛的问题
apull 发表于 2026-6-4 11:11
图太慢了,看介绍不错,可以试一下。
hbu126 发表于 2026-6-4 11:12
牛掰,楼主高手啊
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - 52pojie.cn ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2026-6-6 09:16

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表