[Python] 纯文本查看 复制代码
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
import threading
import time
import os
import platform
import subprocess
import pygame
from pygame import mixer
import json
import configparser
import datetime
class IPMonitorApp:
def __init__(self, root):
self.root = root
self.root.title("IP在线检测与警报系统V1.2")
self.root.geometry("800x600")
self.root.resizable(True, True)
# 初始化变量
self.ip_list = []
self.monitoring = False
self.check_interval = 5 # 默认5秒检测一次
self.config_file = "ip_monitor_config.ini"
self.offline_logs = [] # 存储离线日志
self.startTime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') #程序运行时间
self.startMonitoringTime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') #开始检测时间
# 初始化声音系统
self.init_sound()
# 创建界面
self.create_widgets()
# 加载配置
self.load_config()
# 启动GUI更新线程
self.update_thread = None
# 绑定窗口关闭事件
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
def init_sound(self):
"""初始化声音系统"""
try:
mixer.init()
# 创建默认警报声
self.create_default_alert_sound()
except:
messagebox.showwarning("声音警告", "无法初始化声音系统,警报功能可能无法正常工作")
def create_default_alert_sound(self):
"""创建默认警报声"""
# 这是一个简单的警报声,实际应用中可以使用外部音频文件
try:
# 尝试加载外部警报声文件
if os.path.exists("alert.wav"):
self.alert_sound = mixer.Sound("alert.wav")
else:
# 创建一个简单的蜂鸣声作为备用
import numpy as np
import wave
import struct
# 生成蜂鸣声
sample_rate = 44100
duration = 1.0 # 秒
frequency = 880 # Hz
# 生成声音数据
samples = []
for i in range(int(duration * sample_rate)):
sample = 0.5 * np.sin(2 * np.pi * frequency * i / sample_rate)
samples.append(sample)
# 转换为16位整数
samples = [int(s * 32767) for s in samples]
# 保存为临时文件
with wave.open("temp_alert.wav", "w") as f:
f.setnchannels(1)
f.setsampwidth(2)
f.setframerate(sample_rate)
f.writeframes(b''.join([struct.pack('<h', s) for s in samples]))
self.alert_sound = mixer.Sound("temp_alert.wav")
except:
# 如果声音创建失败,设置一个空的声音对象
self.alert_sound = None
def create_widgets(self):
"""创建GUI组件"""
# 主框架
main_frame = ttk.Frame(self.root, padding="10")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# 配置行列权重
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
main_frame.columnconfigure(1, weight=1)
main_frame.rowconfigure(2, weight=1)
# IP输入区域
ip_frame = ttk.LabelFrame(main_frame, text="IP地址管理", padding="5")
ip_frame.grid(row=0, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(0, 10))
ip_frame.columnconfigure(0, weight=1)
ttk.Label(ip_frame, text="IP地址:").grid(row=0, column=0, sticky=tk.W)
self.ip_entry = ttk.Entry(ip_frame)
self.ip_entry.grid(row=0, column=1, sticky=(tk.W, tk.E), padx=(5, 5))
self.ip_entry.bind('<Return>', self.add_ip)
ttk.Button(ip_frame, text="添加", command=self.add_ip).grid(row=0, column=2, padx=(5, 0))
ttk.Button(ip_frame, text="删除选中", command=self.remove_ip).grid(row=0, column=3, padx=(5, 0))
ttk.Button(ip_frame, text="清空列表", command=self.clear_ips).grid(row=0, column=4, padx=(5, 0))
# IP列表
list_frame = ttk.Frame(main_frame)
list_frame.grid(row=1, column=0, columnspan=2, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(0, 10))
list_frame.columnconfigure(0, weight=1)
list_frame.rowconfigure(0, weight=1)
columns = ("ip", "status", "last_check")
self.ip_tree = ttk.Treeview(list_frame, columns=columns, show="headings", height=10)
# 设置列标题
self.ip_tree.heading("ip", text="IP地址")
self.ip_tree.heading("status", text="状态")
self.ip_tree.heading("last_check", text="最后检测时间")
# 设置列宽
self.ip_tree.column("ip", width=200)
self.ip_tree.column("status", width=100)
self.ip_tree.column("last_check", width=200)
# 添加滚动条
scrollbar = ttk.Scrollbar(list_frame, orient=tk.VERTICAL, command=self.ip_tree.yview)
self.ip_tree.configure(yscrollcommand=scrollbar.set)
self.ip_tree.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
# 控制面板
control_frame = ttk.LabelFrame(main_frame, text="控制面板", padding="5")
control_frame.grid(row=2, column=0, columnspan=2, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(0, 10))
control_frame.columnconfigure(1, weight=1)
ttk.Label(control_frame, text="检测间隔(秒):").grid(row=0, column=0, sticky=tk.W)
self.interval_var = tk.StringVar(value=str(self.check_interval))
interval_spin = ttk.Spinbox(control_frame, from_=1, to=60, textvariable=self.interval_var, width=10)
interval_spin.grid(row=0, column=1, sticky=tk.W, padx=(5, 0))
self.start_button = ttk.Button(control_frame, text="开始检测", command=self.start_monitoring)
self.start_button.grid(row=0, column=2, padx=(20, 5))
self.stop_button = ttk.Button(control_frame, text="停止检测", command=self.stop_monitoring, state=tk.DISABLED)
self.stop_button.grid(row=0, column=3, padx=(5, 0))
ttk.Button(control_frame, text="测试警报", command=self.test_alert).grid(row=0, column=4, padx=(20, 0))
# 配置管理按钮
config_frame = ttk.Frame(control_frame)
config_frame.grid(row=0, column=5, padx=(20, 0))
ttk.Button(config_frame, text="保存配置", command=self.save_config).grid(row=0, column=0, padx=(5, 0))
ttk.Button(config_frame, text="加载配置", command=self.load_config).grid(row=0, column=1, padx=(5, 0))
ttk.Button(config_frame, text="导出离线日志", command=self.export_offline_logs).grid(row=0, column=2,
padx=(5, 0))
# 日志区域
log_frame = ttk.LabelFrame(main_frame, text="日志", padding="5")
log_frame.grid(row=3, column=0, columnspan=2, sticky=(tk.W, tk.E, tk.N, tk.S))
log_frame.columnconfigure(0, weight=1)
log_frame.rowconfigure(0, weight=1)
self.log_text = tk.Text(log_frame, height=8, wrap=tk.WORD)
log_scrollbar = ttk.Scrollbar(log_frame, orient=tk.VERTICAL, command=self.log_text.yview)
self.log_text.configure(yscrollcommand=log_scrollbar.set)
self.log_text.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
log_scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
def add_ip(self, event=None):
"""添加IP到列表"""
ip = self.ip_entry.get().strip()
if not ip:
messagebox.showwarning("输入错误", "请输入IP地址")
return
# 简单的IP格式验证
if not self.is_valid_ip(ip):
messagebox.showwarning("输入错误", "请输入有效的IP地址")
return
# 检查是否已存在
for item in self.ip_tree.get_children():
if self.ip_tree.item(item, "values")[0] == ip:
messagebox.showinfo("提示", "该IP已存在于列表中")
return
# 添加到列表
current_time = time.strftime("%Y-%m-%d %H:%M:%S")
self.ip_tree.insert("", tk.END, values=(ip, "未检测", current_time))
self.ip_entry.delete(0, tk.END)
self.log(f"添加IP: {ip}")
def remove_ip(self):
"""删除选中的IP"""
selected = self.ip_tree.selection()
if not selected:
messagebox.showinfo("提示", "请先选择要删除的IP")
return
for item in selected:
ip = self.ip_tree.item(item, "values")[0]
self.ip_tree.delete(item)
self.log(f"删除IP: {ip}")
def clear_ips(self):
"""清空IP列表"""
if messagebox.askyesno("确认", "确定要清空所有IP地址吗?"):
for item in self.ip_tree.get_children():
self.ip_tree.delete(item)
self.log("清空IP列表")
def is_valid_ip(self, ip):
"""简单验证IP地址格式"""
parts = ip.split('.')
if len(parts) != 4:
return False
for part in parts:
if not part.isdigit():
return False
num = int(part)
if num < 0 or num > 255:
return False
return True
def start_monitoring(self):
"""开始检测IP状态"""
if not self.ip_tree.get_children():
messagebox.showwarning("警告", "请先添加要检测的IP地址")
return
try:
self.check_interval = int(self.interval_var.get())
if self.check_interval < 1:
raise ValueError
except ValueError:
messagebox.showwarning("输入错误", "检测间隔必须是1-60之间的整数")
return
self.monitoring = True
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
# 启动检测线程
self.monitor_thread = threading.Thread(target=self.monitor_ips, daemon=True)
self.monitor_thread.start()
self.startMonitoringTime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.log("开始检测IP状态")
def stop_monitoring(self):
"""停止检测IP状态"""
self.monitoring = False
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
self.log("停止检测IP状态")
def monitor_ips(self):
"""监控IP状态的线程函数"""
while self.monitoring:
for item in self.ip_tree.get_children():
if not self.monitoring:
break
ip = self.ip_tree.item(item, "values")[0]
current_time = time.strftime("%Y-%m-%d %H:%M:%S")
# 检测IP状态
is_online = self.ping_ip(ip)
# 更新UI
self.root.after(0, self.update_ip_status, item, ip, is_online, current_time)
# 如果不在线,播放警报
if not is_online:
self.root.after(0, self.play_alert, ip)
# 等待间隔时间
for i in range(self.check_interval * 10):
if not self.monitoring:
break
time.sleep(0.1)
def ping_ip(self, ip):
"""
检测IP是否在线
改进版:检查ping命令的输出内容,而不仅仅是返回码
同时隐藏控制台窗口
"""
try:
# 根据操作系统选择ping命令
param = "-n" if platform.system().lower() == "windows" else "-c"
# 准备子进程参数
ping_args = ["ping", param, "1", ip]
# 在Windows上使用CREATE_NO_WINDOW标志隐藏控制台窗口
if platform.system().lower() == "windows":
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = 0 # SW_HIDE
# 执行ping命令,隐藏控制台窗口
result = subprocess.run(
ping_args,
capture_output=True,
text=True,
timeout=5,
startupinfo=startupinfo,
creationflags=subprocess.CREATE_NO_WINDOW
)
else:
# 在非Windows系统上,正常执行
result = subprocess.run(
ping_args,
capture_output=True,
text=True,
timeout=5
)
# 检查返回码和输出内容
if result.returncode == 0:
# 返回码为0表示ping命令本身执行成功
# 但我们需要检查输出内容,确保是真正的"回复"而不是"无法访问目标主机"
output = result.stdout.lower()
# 检查常见的表示失败的字符串
failure_indicators = [
"无法访问目标主机",
"destination host unreachable",
"request timed out",
"传输失败",
"general failure",
"ttl expired in transit"
]
# 如果输出中包含任何失败指示,则认为IP不在线
for indicator in failure_indicators:
if indicator in output.lower():
return False
# 检查是否有成功的回复
success_indicators = [
"回复",
"reply",
"bytes=",
"来自"
]
for indicator in success_indicators:
if indicator in output.lower():
return True
# 如果没有明确的成功或失败指示,使用返回码
return result.returncode == 0
else:
# 返回码不为0,肯定是不在线
return False
except subprocess.TimeoutExpired:
# 超时表示不在线
return False
except Exception:
# 其他异常也表示不在线
return False
def update_ip_status(self, item, ip, is_online, current_time):
"""更新IP状态显示"""
status = "在线" if is_online else "离线"
self.ip_tree.item(item, values=(ip, status, current_time))
# 更新日志
if is_online:
self.log(f"IP {ip} 在线")
else:
self.log(f" IP {ip} 离线", "warning")
# 添加到离线日志列表
self.add_offline_log(ip, current_time)
def add_offline_log(self, ip, timestamp):
"""添加离线日志到列表"""
log_entry = f"[{timestamp}] IP {ip} 离线"
self.offline_logs.append(log_entry)
def export_offline_logs(self):
"""导出离线日志到文件"""
if not self.offline_logs:
messagebox.showinfo("提示", "没有离线日志可导出")
return
# 生成默认文件名:当前时间
current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
default_filename = f"离线日志_{current_time}.txt"
# 获取程序运行目录
app_dir = os.path.dirname(os.path.abspath(__file__))
default_path = os.path.join(app_dir, default_filename)
# 选择保存文件位置,使用默认路径和文件名
filename = filedialog.asksaveasfilename(
title="保存离线日志",
defaultextension=".txt",
initialfile=default_filename,
initialdir=app_dir,
filetypes=[("文本文件", "*.txt"), ("所有文件", "*.*")]
)
if not filename:
return # 用户取消了保存
try:
with open(filename, 'w', encoding='utf-8') as f:
f.write("IP监控系统 - 离线日志报告\n")
f.write(f"程序运行时间: {self.startTime}\n")
f.write(f"最后一次开始检测时间: {self.startMonitoringTime}\n")
f.write(f"导出时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("=" * 50 + "\n\n")
for log in self.offline_logs:
f.write(log + "\n")
f.write(f"\n总计离线记录: {len(self.offline_logs)} 条\n")
self.log(f"离线日志已导出到: {filename}")
messagebox.showinfo("成功", f"离线日志已导出到:\n{filename}")
except Exception as e:
self.log(f"导出离线日志失败: {str(e)}", "error")
messagebox.showerror("错误", f"导出离线日志失败: {str(e)}")
def play_alert(self, ip):
"""播放警报声音"""
if self.alert_sound:
try:
self.alert_sound.play()
except:
# 如果播放失败,使用系统蜂鸣声
print("\a") # 系统蜂鸣声
def test_alert(self):
"""测试警报声音"""
self.play_alert("测试")
self.log("测试警报声音")
def log(self, message, level="info"):
"""添加日志"""
timestamp = time.strftime("%H:%M:%S")
if level == "warning":
log_entry = f"[{timestamp}] 警告: {message}\n"
else:
log_entry = f"[{timestamp}] {message}\n"
self.log_text.insert(tk.END, log_entry)
self.log_text.see(tk.END)
# 自动滚动到底部
self.log_text.see(tk.END)
def save_config(self):
"""保存配置到文件"""
config = configparser.ConfigParser()
# 保存设置
config['SETTINGS'] = {
'check_interval': self.interval_var.get(),
}
# 保存IP列表
ip_list = []
for item in self.ip_tree.get_children():
ip = self.ip_tree.item(item, "values")[0]
ip_list.append(ip)
config['IP_LIST'] = {'ips': json.dumps(ip_list)}
# 写入文件
try:
with open(self.config_file, 'w') as f:
config.write(f)
self.log(f"配置已保存到: {self.config_file}")
# messagebox.showinfo("成功", f"配置已保存到: {self.config_file}")
except Exception as e:
self.log(f"保存配置失败: {str(e)}", "error")
messagebox.showerror("错误", f"保存配置失败: {str(e)}")
def load_config(self):
"""从文件加载配置"""
if not os.path.exists(self.config_file):
self.log("配置文件不存在,使用默认设置")
return
try:
config = configparser.ConfigParser()
config.read(self.config_file)
# 加载设置
if 'SETTINGS' in config:
if 'check_interval' in config['SETTINGS']:
self.interval_var.set(config['SETTINGS']['check_interval'])
# 加载IP列表
if 'IP_LIST' in config and 'ips' in config['IP_LIST']:
ip_list = json.loads(config['IP_LIST']['ips'])
# 清空当前列表
for item in self.ip_tree.get_children():
self.ip_tree.delete(item)
# 添加保存的IP
for ip in ip_list:
current_time = time.strftime("%Y-%m-%d %H:%M:%S")
self.ip_tree.insert("", tk.END, values=(ip, "未检测", current_time))
self.log(f"从配置文件加载了 {len(ip_list)} 个IP地址")
# self.log(f"配置已从 {self.config_file} 加载")
except Exception as e:
self.log(f"加载配置失败: {str(e)}", "error")
messagebox.showerror("错误", f"加载配置失败: {str(e)}")
def on_closing(self):
"""窗口关闭事件处理"""
# 停止监控
self.monitoring = False
# 询问是否保存配置
# if messagebox.askyesno("退出", "退出前是否保存当前配置?"):
self.save_config()
# 清理资源
if hasattr(self, 'alert_sound'):
pygame.mixer.quit()
# 关闭窗口
self.root.destroy()
def main():
# 初始化pygame mixer
try:
pygame.mixer.init()
except:
print("警告: 无法初始化声音系统")
# 创建主窗口
root = tk.Tk()
app = IPMonitorApp(root)
# 启动主循环
try:
root.mainloop()
except KeyboardInterrupt:
print("程序被用户中断")
finally:
# 清理资源
app.monitoring = False
if __name__ == "__main__":
main()