[Python] 纯文本查看 复制代码
import socket
import threading
import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext
class PolicyServer:
def __init__(self, host='127.0.0.1', port=5840):
self.host = host
self.port = port
self.running = False
self.server_socket = None
self.server_thread = None
# 定义策略文件内容
self.POLICY = b'''<cross-domain-policy>
<allow-access-from domain="*" to-ports="*"/>
</cross-domain-policy>\x00'''
def start(self):
"""启动策略服务器"""
if self.running:
return
try:
# 创建服务器套接字
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
self.running = True
# 启动服务器线程
self.server_thread = threading.Thread(target=self._run_server, daemon=True)
self.server_thread.start()
return True
except Exception as e:
messagebox.showerror("启动错误", f"无法启动服务器: {str(e)}")
return False
def _run_server(self):
"""运行服务器的主循环"""
while self.running:
try:
client, addr = self.server_socket.accept()
data = client.recv(1024)
if b"<policy-file-request/>" in data:
client.send(self.POLICY)
self.log_callback(f"已向 {addr[0]}:{addr[1]} 发送策略文件")
client.close()
except Exception as e:
if self.running: # 仅记录非正常关闭的错误
self.log_callback(f"服务器错误: {str(e)}")
def stop(self):
"""停止策略服务器"""
if not self.running:
return
self.running = False
try:
# 创建临时套接字连接以解除阻塞
temp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
temp_socket.connect((self.host, self.port))
temp_socket.close()
except:
pass
# 关闭服务器套接字
if self.server_socket:
self.server_socket.close()
self.server_socket = None
self.log_callback("服务器已停止")
class PolicyServerApp:
def __init__(self, root):
self.root = root
self.root.title("Flash策略服务器控制工具")
self.root.geometry("680x480")
self.root.resizable(True, True)
# 创建策略服务器实例
self.server = PolicyServer()
self.server.log_callback = self.log_message
# 设置主题
self.setup_theme()
# 创建UI
self.setup_ui()
# 日志记录
self.log_message("应用程序已启动")
self.log_message(f"策略服务器将运行在 {self.server.host}:{self.server.port}")
def setup_theme(self):
"""设置应用程序主题"""
style = ttk.Style()
style.theme_use('clam')
# 自定义颜色
self.root.configure(bg='#f0f0f0')
style.configure('TButton', font=('Segoe UI', 10), padding=6)
style.configure('TFrame', background='#f0f0f0')
style.configure('TLabel', background='#f0f0f0', font=('Segoe UI', 9))
style.configure('Header.TLabel', background='#4a6baf', foreground='white',
font=('Segoe UI', 12, 'bold'))
def setup_ui(self):
"""设置用户界面"""
# 创建主框架
main_frame = ttk.Frame(self.root, padding=15)
main_frame.pack(fill=tk.BOTH, expand=True)
# 标题
header = ttk.Label(main_frame, text="Flash策略服务器控制工具",
style='Header.TLabel', anchor=tk.CENTER)
header.pack(fill=tk.X, pady=(0, 15), ipady=10)
# 信息面板
info_frame = ttk.LabelFrame(main_frame, text="服务器信息", padding=10)
info_frame.pack(fill=tk.X, pady=5)
info_text = tk.Text(info_frame, height=4, font=('Segoe UI', 9), wrap=tk.WORD,
bg='#f8f8f8', relief=tk.FLAT)
info_text.pack(fill=tk.BOTH, expand=True)
info_text.insert(tk.END,
"此工具提供Flash策略文件服务,允许跨域访问。\n"
"服务器运行在127.0.0.1:5840端口,当收到包含"
"<policy-file-request/>的请求时,会发送策略文件。\n\n"
"注意:此服务器用于本地开发和测试目的。")
info_text.configure(state=tk.DISABLED)
# 状态面板
status_frame = ttk.Frame(main_frame)
status_frame.pack(fill=tk.X, pady=10)
ttk.Label(status_frame, text="服务器状态:", font=('Segoe UI', 10)).pack(side=tk.LEFT)
self.status_var = tk.StringVar(value="已停止")
self.status_label = ttk.Label(status_frame, textvariable=self.status_var,
font=('Segoe UI', 10, 'bold'), foreground='red')
self.status_label.pack(side=tk.LEFT, padx=10)
# 按钮面板
btn_frame = ttk.Frame(main_frame)
btn_frame.pack(fill=tk.X, pady=10)
self.start_btn = ttk.Button(btn_frame, text="启动服务器",
command=self.start_server, width=15)
self.start_btn.pack(side=tk.LEFT, padx=5)
self.stop_btn = ttk.Button(btn_frame, text="停止服务器",
command=self.stop_server, width=15, state=tk.DISABLED)
self.stop_btn.pack(side=tk.LEFT, padx=5)
# 日志面板
log_frame = ttk.LabelFrame(main_frame, text="日志", padding=10)
log_frame.pack(fill=tk.BOTH, expand=True, pady=(10, 0))
self.log_area = scrolledtext.ScrolledText(log_frame, height=10,
font=('Consolas', 9), wrap=tk.WORD)
self.log_area.pack(fill=tk.BOTH, expand=True)
self.log_area.configure(state=tk.DISABLED)
# 底部信息
footer = ttk.Label(main_frame, text="© 2023 Flash策略服务器工具 | 端口: 5840",
font=('Segoe UI', 8), foreground='gray')
footer.pack(side=tk.BOTTOM, fill=tk.X, pady=(5, 0))
def start_server(self):
"""启动服务器"""
if self.server.start():
self.status_var.set("运行中")
self.status_label.configure(foreground='green')
self.start_btn.configure(state=tk.DISABLED)
self.stop_btn.configure(state=tk.NORMAL)
self.log_message("服务器已启动")
def stop_server(self):
"""停止服务器"""
self.server.stop()
self.status_var.set("已停止")
self.status_label.configure(foreground='red')
self.start_btn.configure(state=tk.NORMAL)
self.stop_btn.configure(state=tk.DISABLED)
def log_message(self, message):
"""记录消息到日志区域"""
self.log_area.configure(state=tk.NORMAL)
self.log_area.insert(tk.END, message + "\n")
self.log_area.see(tk.END)
self.log_area.configure(state=tk.DISABLED)
def on_closing(self):
"""窗口关闭时的处理"""
if self.server.running:
self.server.stop()
self.root.destroy()
if __name__ == "__main__":
root = tk.Tk()
app = PolicyServerApp(root)
root.protocol("WM_DELETE_WINDOW", app.on_closing)
root.mainloop()