吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 648|回复: 3
上一主题 下一主题
收起左侧

[求助] 让手机干活?mcp服务,求改进。

[复制链接]
跳转到指定楼层
楼主
qqy123 发表于 2026-5-2 13:24 回帖奖励
本帖最后由 qqy123 于 2026-5-2 15:56 编辑

哔哩哔哩:https://b23.tv/n6JrWkE

这个手机版的MCP服务还有很多功能无法实现,比如打开抖音点赞评论。还有不知道怎么回事,有时候写入文件时会莫名其妙的中断,难道是安全上限 60 的错?

最重要的是只能本地127.0.0.1连接,IPv4和IPv6都不可以用,有没有大佬解决一下

MCP代码和相关工具:https://wwamp.lanzouu.com/ivP953ohftmb

[Python] 纯文本查看 复制代码
#!/usr/bin/env python3
"""
Android 手机 MCP 工具服务 (全功能版 · 官方 MCP SDK)
硬件控制 · 文件管理 · 传感器 · 通讯 · 应用操作 · 命令执行 · 通知监听
启动: python server.py --port 3000
在 AetherLink 中添加 SSE 端点: http://localhost:3000/sse
"""

import json
import os
import shutil
import subprocess
import time
import zipfile
import argparse
import signal
from pathlib import Path
from typing import Optional, List, Dict

from mcp.server.fastmcp import FastMCP  # 官方 SDK 导入
import uvicorn

# 初始化 MCP 服务
mcp = FastMCP("Android Phone Controller")

# ======================= 原有手机控制工具 =======================

@mcp.tool()
def get_battery_status() -> dict:
    """获取电池状态(电量、温度、充电状态等)"""
    try:
        result = subprocess.run(["termux-battery-status"], capture_output=True, text=True, check=True)
        return json.loads(result.stdout)
    except subprocess.CalledProcessError as e:
        return {"error": f"命令执行失败: {e.stderr}"}
    except FileNotFoundError:
        return {"error": "未安装 termux-api,请执行 pkg install termux-api"}

@mcp.tool()
def get_clipboard() -> str:
    """读取剪贴板内容"""
    try:
        result = subprocess.run(["termux-clipboard-get"], capture_output=True, text=True, check=True)
        return result.stdout.strip()
    except subprocess.CalledProcessError as e:
        return f"读取剪贴板失败: {e.stderr}"

@mcp.tool()
def set_clipboard(text: str) -> str:
    """设置剪贴板内容"""
    try:
        subprocess.run(["termux-clipboard-set", text], check=True, capture_output=True, text=True)
        return f"剪贴板已设置为: {text}"
    except subprocess.CalledProcessError as e:
        return f"设置剪贴板失败: {e.stderr}"

@mcp.tool()
def take_photo(filename: str = "mcp_photo.jpg") -> str:
    """使用后置摄像头拍照(默认保存到当前目录)"""
    try:
        subprocess.run(["termux-camera-photo", "-c", "0", filename], check=True, capture_output=True, text=True)
        return f"照片已保存为: {filename}"
    except subprocess.CalledProcessError as e:
        return f"拍照失败: {e.stderr}"

@mcp.tool()
def list_sensors() -> list:
    """列出所有可用传感器"""
    try:
        result = subprocess.run(["termux-sensor", "-l"], capture_output=True, text=True, check=True)
        lines = result.stdout.strip().split("\n")
        sensors = [line.strip() for line in lines if line.strip() and not line.startswith("Available")]
        return sensors
    except subprocess.CalledProcessError as e:
        return [f"获取传感器列表失败: {e.stderr}"]

@mcp.tool()
def get_sensor_data(sensor_name: str, delay_ms: int = 1000, limit: int = 1) -> list[dict]:
    """获取指定传感器数据(默认读取一次,延时1000ms)"""
    try:
        delay_us = delay_ms * 1000
        result = subprocess.run(
            ["termux-sensor", "-s", sensor_name, "-n", str(limit), "-d", str(delay_us)],
            capture_output=True, text=True, timeout=(limit * (delay_ms / 1000) + 5)
        )
        if result.returncode != 0:
            return [{"error": result.stderr.strip()}]
        lines = result.stdout.strip().split("\n")
        data = []
        for line in lines:
            try:
                data.append(json.loads(line))
            except json.JSONDecodeError:
                pass
        return data
    except subprocess.TimeoutExpired:
        return [{"error": "获取传感器数据超时"}]
    except subprocess.CalledProcessError as e:
        return [{"error": f"命令执行失败: {e.stderr}"}]

@mcp.tool()
def get_device_info() -> dict:
    """获取安卓设备基本信息(型号、Android版本、SDK等)"""
    info = {}
    try:
        info["model"] = subprocess.run(["getprop", "ro.product.model"], capture_output=True, text=True).stdout.strip()
    except:
        info["model"] = "unknown"
    try:
        info["manufacturer"] = subprocess.run(["getprop", "ro.product.manufacturer"], capture_output=True, text=True).stdout.strip()
    except:
        info["manufacturer"] = "unknown"
    try:
        info["android_version"] = subprocess.run(["getprop", "ro.build.version.release"], capture_output=True, text=True).stdout.strip()
    except:
        info["android_version"] = "unknown"
    try:
        info["sdk_version"] = subprocess.run(["getprop", "ro.build.version.sdk"], capture_output=True, text=True).stdout.strip()
    except:
        info["sdk_version"] = "unknown"
    return info

@mcp.tool()
def send_notification(title: str, content: str, sound: bool = True) -> dict:
    """在手机上发送通知,返回通知ID用于后续移除"""
    try:
        cmd = ["termux-notification", "-t", title, "-c", content]
        if sound:
            cmd.append("--sound")
        result = subprocess.run(cmd, capture_output=True, text=True, check=True)
        notif_id = result.stdout.strip()
        return {"message": "通知已发送", "id": notif_id}
    except subprocess.CalledProcessError as e:
        return {"error": f"发送通知失败: {e.stderr}"}
    except FileNotFoundError:
        return {"error": "未安装 termux-api"}

@mcp.tool()
def wifi_scan() -> list[dict]:
    """扫描周边 Wi-Fi 热点"""
    try:
        result = subprocess.run(["termux-wifi-scaninfo"], capture_output=True, text=True, check=True)
        return json.loads(result.stdout)
    except subprocess.CalledProcessError as e:
        return [{"error": f"扫描失败: {e.stderr}"}]
    except json.JSONDecodeError:
        return [{"error": "无法解析 Wi-Fi 扫描结果"}]

@mcp.tool()
def vibrate(duration_ms: int = 500) -> str:
    """让手机震动指定毫秒"""
    try:
        subprocess.run(["termux-vibrate", "-d", str(duration_ms)], check=True, capture_output=True, text=True)
        return f"已震动 {duration_ms} 毫秒"
    except subprocess.CalledProcessError as e:
        return f"震动失败: {e.stderr}"
    except FileNotFoundError:
        return "未安装 termux-api"

# ======================= 新增 Termux:API 工具 =======================

@mcp.tool()
def call_number(phone_number: str) -> str:
    """拨打指定电话号码(需手动确认,AI 慎用)"""
    try:
        subprocess.run(["termux-telephony-call", phone_number], check=True, capture_output=True, text=True)
        return f"正在拨号: {phone_number}"
    except subprocess.CalledProcessError as e:
        return f"拨号失败: {e.stderr}"

@mcp.tool()
def send_sms(phone_number: str, message: str) -> str:
    """发送短信到指定号码(AI 慎用)"""
    try:
        subprocess.run(["termux-sms-send", "-n", phone_number, message], check=True, capture_output=True, text=True)
        return f"短信已发送至 {phone_number}"
    except subprocess.CalledProcessError as e:
        return f"发送短信失败: {e.stderr}"

@mcp.tool()
def read_sms_inbox(limit: int = 10) -> list[dict]:
    """读取手机短信收件箱最近的若干条"""
    try:
        result = subprocess.run(
            ["termux-sms-list", "-t", "inbox", "-l", str(limit)],
            capture_output=True, text=True, check=True
        )
        return json.loads(result.stdout)
    except subprocess.CalledProcessError as e:
        return [{"error": f"读取短信失败: {e.stderr}"}]

@mcp.tool()
def list_contacts() -> list[dict]:
    """列出所有联系人姓名和号码"""
    try:
        result = subprocess.run(["termux-contact-list"], capture_output=True, text=True, check=True)
        lines = result.stdout.strip().split("\n")
        contacts = []
        for line in lines:
            if line:
                parts = line.split(",", 1)
                contacts.append({"name": parts[0].strip(), "phone": parts[1].strip() if len(parts) > 1 else ""})
        return contacts
    except subprocess.CalledProcessError as e:
        return [{"error": f"获取联系人失败: {e.stderr}"}]

@mcp.tool()
def list_call_log(limit: int = 20) -> list[dict]:
    """读取最近的通话记录"""
    try:
        result = subprocess.run(
            ["termux-call-log", "-l", str(limit)],
            capture_output=True, text=True, check=True
        )
        return json.loads(result.stdout)
    except subprocess.CalledProcessError as e:
        return [{"error": f"获取通话记录失败: {e.stderr}"}]

@mcp.tool()
def get_cell_info() -> dict:
    """获取移动网络基站信息"""
    try:
        result = subprocess.run(["termux-telephony-cellinfo"], capture_output=True, text=True, check=True)
        return json.loads(result.stdout)
    except subprocess.CalledProcessError as e:
        return {"error": f"获取基站信息失败: {e.stderr}"}

@mcp.tool()
def get_telephony_device_info() -> dict:
    """获取设备 IMEI、网络类型等电话相关信息"""
    try:
        result = subprocess.run(["termux-telephony-deviceinfo"], capture_output=True, text=True, check=True)
        return json.loads(result.stdout)
    except subprocess.CalledProcessError as e:
        return {"error": f"获取电话设备信息失败: {e.stderr}"}

@mcp.tool()
def get_location(provider: str = "gps") -> dict:
    """获取设备当前 GPS 定位 (provider: gps/network/passive)"""
    try:
        result = subprocess.run(
            ["termux-location", "-p", provider],
            capture_output=True, text=True, check=True
        )
        return json.loads(result.stdout)
    except subprocess.CalledProcessError as e:
        return {"error": f"获取定位失败: {e.stderr}"}

@mcp.tool()
def get_camera_info() -> dict:
    """获取摄像头参数(支持的分辨率、闪光灯等)"""
    try:
        result = subprocess.run(["termux-camera-info"], capture_output=True, text=True, check=True)
        return json.loads(result.stdout)
    except subprocess.CalledProcessError as e:
        return {"error": f"获取摄像头信息失败: {e.stderr}"}

@mcp.tool()
def record_audio(filename: str = "recording.wav", duration_sec: int = 5, sample_rate: int = 44100) -> str:
    """使用麦克风录制音频(WAV格式)"""
    try:
        subprocess.run(
            ["termux-microphone-record", "-f", filename, "-d", str(duration_sec), "-r", str(sample_rate)],
            check=True, capture_output=True, text=True
        )
        return f"录音完成,保存为: {filename},时长{duration_sec}秒"
    except subprocess.CalledProcessError as e:
        return f"录音失败: {e.stderr}"

@mcp.tool()
def play_media(filepath: str) -> str:
    """播放指定的媒体文件(音频/视频)"""
    try:
        subprocess.Popen(["termux-media-player", "play", filepath])
        return f"开始播放: {filepath}"
    except Exception as e:
        return f"播放失败: {str(e)}"

@mcp.tool()
def set_volume(stream: str = "music", volume: int = 5) -> str:
    """调整音量,stream: music/ring/alarm/notification,volume: 0-7"""
    try:
        subprocess.run(["termux-volume", stream, str(volume)], check=True, capture_output=True, text=True)
        return f"{stream} 音量已设置为 {volume}"
    except subprocess.CalledProcessError as e:
        return f"调整音量失败: {e.stderr}"

@mcp.tool()
def set_brightness(brightness: int = 128) -> str:
    """设置屏幕亮度 (0-255)"""
    try:
        subprocess.run(["termux-brightness", str(brightness)], check=True, capture_output=True, text=True)
        return f"屏幕亮度已设置为 {brightness}"
    except subprocess.CalledProcessError as e:
        return f"调整亮度失败: {e.stderr}"

@mcp.tool()
def show_toast(message: str, long: bool = False) -> str:
    """在屏幕上显示短暂的 Toast 提示"""
    try:
        cmd = ["termux-toast"]
        if long:
            cmd.append("-l")
        cmd.append(message)
        subprocess.run(cmd, check=True, capture_output=True, text=True)
        return f"Toast 已显示: {message}"
    except subprocess.CalledProcessError as e:
        return f"显示 Toast 失败: {e.stderr}"

@mcp.tool()
def share_content(content: str, action: str = "send") -> str:
    """将文本或文件路径分享到其他 App。action 可选: send(发送), view(查看)"""
    try:
        subprocess.run(["termux-share", "-a", action, content], check=True, capture_output=True, text=True)
        return f"已发起分享: {content}"
    except subprocess.CalledProcessError as e:
        return f"分享失败: {e.stderr}"

@mcp.tool()
def download_file_url(url: str, description: str = "", title: str = "") -> str:
    """通过系统下载管理器下载文件"""
    try:
        cmd = ["termux-download", url]
        if description:
            cmd += ["-d", description]
        if title:
            cmd += ["-t", title]
        subprocess.run(cmd, check=True, capture_output=True, text=True)
        return f"下载任务已添加: {url}"
    except subprocess.CalledProcessError as e:
        return f"添加下载失败: {e.stderr}"

@mcp.tool()
def toggle_torch(state: bool) -> str:
    """开关手电筒 (True 开, False 关)"""
    try:
        cmd = ["termux-torch"]
        if state:
            cmd.append("on")
        else:
            cmd.append("off")
        subprocess.run(cmd, check=True, capture_output=True, text=True)
        return f"手电筒已{'打开' if state else '关闭'}"
    except subprocess.CalledProcessError as e:
        return f"控制手电筒失败: {e.stderr}"

@mcp.tool()
def wifi_enable(state: bool) -> str:
    """开关 Wi-Fi (True 打开, False 关闭)"""
    try:
        cmd = ["termux-wifi-enable", "true" if state else "false"]
        subprocess.run(cmd, check=True, capture_output=True, text=True)
        return f"Wi-Fi 已{'开启' if state else '关闭'}"
    except subprocess.CalledProcessError as e:
        return f"操作失败: {e.stderr}"

@mcp.tool()
def get_wifi_connection_info() -> dict:
    """获取当前 Wi-Fi 连接的 SSID、BSSID、信号强度等信息"""
    try:
        result = subprocess.run(["termux-wifi-connectioninfo"], capture_output=True, text=True, check=True)
        return json.loads(result.stdout)
    except subprocess.CalledProcessError as e:
        return {"error": f"获取 WiFi 信息失败: {e.stderr}"}

@mcp.tool()
def storage_get_file(file_type: str = "text/plain") -> str:
    """通过系统文件选择器获取文件(需 GUI 环境)"""
    try:
        result = subprocess.run(
            ["termux-storage-get", file_type],
            capture_output=True, text=True, check=True
        )
        return result.stdout.strip()
    except subprocess.CalledProcessError as e:
        return f"获取文件失败: {e.stderr} (可能需要在图形界面下运行)"

@mcp.tool()
def remove_notification(notification_id: str) -> str:
    """根据通知ID移除已显示的通知"""
    try:
        subprocess.run(["termux-notification-remove", notification_id], check=True, capture_output=True, text=True)
        return f"通知 {notification_id} 已移除"
    except subprocess.CalledProcessError as e:
        return f"移除通知失败: {e.stderr}"

# ======================= 通知读取工具(一次性快照修复版) =======================

@mcp.tool()
def get_current_notifications(wait_sec: float = 1.5) -> list[dict]:
    """
    获取当前系统通知栏中的所有通知(一次性快照)。
    直接调用 termux-notification-list,等待指定秒数后返回解析后的通知列表。
    注意:需要在系统设置中授予 Termux 通知使用权(设置 → 通知使用权 → 允许 Termux)。
    """
    try:
        result = subprocess.run(
            ["termux-notification-list"],
            capture_output=True,
            text=True,
            timeout=max(wait_sec, 1.0)
        )
        if result.stdout:
            try:
                notifications = json.loads(result.stdout)
            except json.JSONDecodeError:
                # 输出不是合法 JSON,可能是权限错误或空输出,检查 stderr
                if result.stderr and ("permission" in result.stderr.lower() or "access" in result.stderr.lower()):
                    return [{"error": "通知权限未授予,请前往设置 → 应用 → 特殊应用权限 → 通知使用权,为 Termux 开启权限"}]
                return []
            # 检查返回的数组中是否包含权限错误信息
            if isinstance(notifications, list):
                for n in notifications:
                    if isinstance(n, dict) and "error" in n and "通知使用" in n.get("error", ""):
                        return [{"error": "通知权限未授予,请前往设置 → 应用 → 特殊应用权限 → 通知使用权,为 Termux 开启权限"}]
                return notifications
            else:
                # 返回的不是数组,可能是单个对象,直接包装
                return [notifications] if isinstance(notifications, dict) else []
        return []
    except subprocess.TimeoutExpired:
        return [{"error": f"等待超时({wait_sec}秒),未获得通知数据"}]
    except FileNotFoundError:
        return [{"error": "termux-api 未正确安装,请执行 pkg install termux-api"}]
    except Exception as e:
        return [{"error": str(e)}]

# ======================= 文件操作工具 (Python 标准库) =======================

@mcp.tool()
def list_directory(path: str, detailed: bool = False) -> list:
    """列出指定目录内容(绝对路径)"""
    try:
        p = Path(path).absolute()
        if not p.is_dir():
            return [{"error": f"路径不是目录: {path}"}]
        items = []
        for entry in p.iterdir():
            if detailed:
                st = entry.stat()
                items.append({
                    "name": entry.name,
                    "type": "dir" if entry.is_dir() else "file",
                    "size": st.st_size,
                    "modified": time.ctime(st.st_mtime),
                    "permissions": oct(st.st_mode)[-3:]
                })
            else:
                items.append(entry.name + ("/" if entry.is_dir() else ""))
        return items
    except Exception as e:
        return [{"error": str(e)}]

@mcp.tool()
def read_file(path: str, max_lines: int = 500) -> str:
    """读取文本文件内容(自动检测编码)"""
    try:
        file_path = Path(path).absolute()
        if not file_path.is_file():
            return f"错误: 文件不存在: {path}"
        encodings = ['utf-8', 'latin-1', 'gbk']
        for enc in encodings:
            try:
                with open(file_path, 'r', encoding=enc) as f:
                    lines = f.readlines()
                break
            except UnicodeDecodeError:
                continue
        else:
            return "错误: 无法以文本格式读取该文件(可能是二进制文件)"
        if max_lines and len(lines) > max_lines:
            lines = lines[:max_lines]
            lines.append(f"\n[... 仅显示前 {max_lines} 行 ...]")
        return ''.join(lines)
    except Exception as e:
        return f"读取文件出错: {str(e)}"

@mcp.tool()
def create_file(path: str) -> str:
    """创建空文件(已存在则报错)"""
    try:
        file_path = Path(path).absolute()
        if file_path.exists():
            return f"错误: 文件或目录已存在: {path}"
        file_path.touch()
        return f"文件已创建: {file_path}"
    except Exception as e:
        return f"创建文件失败: {str(e)}"

@mcp.tool()
def write_file(path: str, content: str, overwrite: bool = False) -> str:
    """写入内容到文件(默认不覆盖)"""
    try:
        file_path = Path(path).absolute()
        if file_path.exists() and not overwrite:
            return f"错误: 文件已存在,如需覆盖请设置 overwrite=True"
        mode = 'w' if overwrite or not file_path.exists() else 'x'
        with open(file_path, mode, encoding='utf-8') as f:
            f.write(content)
        return f"内容已写入: {file_path}"
    except Exception as e:
        return f"写入文件失败: {str(e)}"

@mcp.tool()
def append_file(path: str, content: str) -> str:
    """向文件末尾追加内容"""
    try:
        file_path = Path(path).absolute()
        with open(file_path, 'a', encoding='utf-8') as f:
            f.write(content)
        return f"内容已追加到: {file_path}"
    except Exception as e:
        return f"追加文件失败: {str(e)}"

@mcp.tool()
def copy_file(source: str, destination: str) -> str:
    """复制文件或目录"""
    try:
        src = Path(source).absolute()
        dst = Path(destination).absolute()
        if not src.exists():
            return f"错误: 源路径不存在: {source}"
        if src.is_dir():
            shutil.copytree(src, dst)
        else:
            shutil.copy2(src, dst)
        return f"复制成功: {source} -> {destination}"
    except Exception as e:
        return f"复制失败: {str(e)}"

@mcp.tool()
def move_file(source: str, destination: str) -> str:
    """移动文件或目录"""
    try:
        src = Path(source).absolute()
        dst = Path(destination).absolute()
        if not src.exists():
            return f"错误: 源路径不存在: {source}"
        shutil.move(str(src), str(dst))
        return f"移动成功: {source} -> {destination}"
    except Exception as e:
        return f"移动失败: {str(e)}"

@mcp.tool()
def delete_file(path: str) -> str:
    """删除文件或目录(危险操作,不可恢复)"""
    try:
        target = Path(path).absolute()
        if not target.exists():
            return f"错误: 路径不存在: {path}"
        if target.is_dir():
            shutil.rmtree(target)
        else:
            target.unlink()
        return f"已删除: {path}"
    except Exception as e:
        return f"删除失败: {str(e)}"

@mcp.tool()
def rename_file(path: str, new_name: str) -> str:
    """重命名文件或目录"""
    try:
        old = Path(path).absolute()
        if not old.exists():
            return f"错误: 路径不存在: {path}"
        new = old.parent / new_name
        old.rename(new)
        return f"重命名成功: {path} -> {new}"
    except Exception as e:
        return f"重命名失败: {str(e)}"

@mcp.tool()
def stat_file(path: str) -> dict:
    """查看文件或目录属性"""
    try:
        p = Path(path).absolute()
        if not p.exists():
            return {"error": f"路径不存在: {path}"}
        s = p.stat()
        return {
            "name": p.name,
            "path": str(p),
            "type": "目录" if p.is_dir() else "文件",
            "size_bytes": s.st_size,
            "permissions": oct(s.st_mode)[-3:],
            "last_modified": time.ctime(s.st_mtime),
            "last_accessed": time.ctime(s.st_atime),
            "is_symlink": p.is_symlink(),
        }
    except Exception as e:
        return {"error": str(e)}

@mcp.tool()
def chmod_file(path: str, mode: str) -> str:
    """修改文件权限(八进制,如 '755')"""
    try:
        target = Path(path).absolute()
        if not target.exists():
            return f"错误: 路径不存在: {path}"
        mask = int(mode, 8)
        os.chmod(target, mask)
        return f"权限已修改为 {mode}: {path}"
    except Exception as e:
        return f"修改权限失败: {str(e)}"

@mcp.tool()
def make_directory(path: str, parents: bool = True) -> str:
    """创建目录"""
    try:
        p = Path(path).absolute()
        if parents:
            p.mkdir(parents=True, exist_ok=True)
        else:
            p.mkdir()
        return f"目录已创建: {p}"
    except Exception as e:
        return f"创建目录失败: {str(e)}"

@mcp.tool()
def compress_files(source_path: str, zip_path: str) -> str:
    """压缩为 zip"""
    try:
        src = Path(source_path).absolute()
        dst = Path(zip_path).absolute()
        if not src.exists():
            return f"错误: 源路径不存在: {source_path}"
        with zipfile.ZipFile(dst, 'w', zipfile.ZIP_DEFLATED) as zf:
            if src.is_dir():
                for root, dirs, files in os.walk(src):
                    for f in files:
                        file_path = os.path.join(root, f)
                        arcname = os.path.relpath(file_path, src.parent)
                        zf.write(file_path, arcname)
            else:
                zf.write(src, src.name)
        return f"压缩成功: {source_path} -> {zip_path}"
    except Exception as e:
        return f"压缩失败: {str(e)}"

@mcp.tool()
def decompress_file(zip_path: str, destination: str = ".") -> str:
    """解压 zip"""
    try:
        zip_file = Path(zip_path).absolute()
        dest_dir = Path(destination).absolute()
        if not zip_file.is_file():
            return f"错误: zip文件不存在: {zip_path}"
        dest_dir.mkdir(parents=True, exist_ok=True)
        with zipfile.ZipFile(zip_file, 'r') as zf:
            zf.extractall(dest_dir)
        return f"解压完成: {zip_path} -> {destination}"
    except zipfile.BadZipFile:
        return "错误: 文件不是有效的 zip 格式"
    except Exception as e:
        return f"解压失败: {str(e)}"

@mcp.tool()
def upload_file(path: str, base64_content: str) -> str:
    """通过 Base64 上传文件(小文件)"""
    try:
        import base64
        file_path = Path(path).absolute()
        binary_data = base64.b64decode(base64_content)
        with open(file_path, 'wb') as f:
            f.write(binary_data)
        return f"文件已上传: {path}({len(binary_data)} 字节)"
    except Exception as e:
        return f"上传失败: {str(e)}"

@mcp.tool()
def download_file(path: str) -> str:
    """下载文件为 Base64 编码(<10MB)"""
    try:
        import base64
        file_path = Path(path).absolute()
        if not file_path.is_file():
            return f"错误: 文件不存在: {path}"
        size = file_path.stat().st_size
        if size > 10 * 1024 * 1024:
            return f"错误: 文件过大({size} 字节),超过 10MB 限制"
        with open(file_path, 'rb') as f:
            data = f.read()
        return base64.b64encode(data).decode('utf-8')
    except Exception as e:
        return f"下载出错: {str(e)}"

# ======================= 命令行工具(高风险,智能权限) =======================

@mcp.tool()
def exec_termux_command(command: str, timeout_sec: int = 30) -> str:
    """
    &#9888;&#65039; 权限控制规则:
1. 每次执行前必须询问,提供4个选项:
   - “仅本次允许”:仅此一条
   - “一直允许”:后续不再询问,直接执行
   - “拒绝”:不执行
   - “自定义授权”:用户当场口述任意规则(如次数、目录、命令白名单、时间限制、任务范围等),后续命令直接按规则执行,不再询问。额度用完/用户取消则自动恢复询问。
2. 选择“一直允许”后,后续无需再问。
3. 选择“仅本次允许”或“拒绝”后,下次仍需重新询问。
4. 选择“自定义授权”或者用户有自定义意向的,请遵守自定义规则。
5. 未经明确表态,禁止执行。

    参数:
      command: 要执行的 shell 命令(字符串)
      timeout_sec: 超时秒数(最大 60 秒)
    """
    timeout_sec = min(timeout_sec, 60)   # 安全上限
    try:
        result = subprocess.run(
            command, shell=True, capture_output=True, text=True, timeout=timeout_sec
        )
        output = f"STDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}"
        if len(output) > 8000:
            output = output[:8000] + "\n...(输出已截断)"
        return output
    except subprocess.TimeoutExpired:
        return f"命令执行超时(>{timeout_sec}秒),已强制终止: {command}"
    except Exception as e:
        return f"执行命令时发生异常: {str(e)}"

# ======================= 启动服务 =======================
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Termux 全能 MCP Server")
    parser.add_argument("--port", type=int, default=3000, help="服务端口")
    parser.add_argument("--host", type=str, default="0.0.0.0", help="监听地址")
    args = parser.parse_args()

    print(f"""
╔══════════════════════════════════════════════╗
║   &#128241; Termux 全能 MCP Server (官方SDK版) 已启动 ║
║                                               ║
║   SSE 端点: http://{args.host}:{args.port}/sse  ║
║   消息端点: http://{args.host}:{args.port}/messages/ ║
║                                               ║
║   已加载工具: 电池/传感器/文件/通讯/命令…      ║
║   按 Ctrl+C 停止服务                          ║
╚══════════════════════════════════════════════╝
    """)

    app = mcp.sse_app()
    uvicorn.run(app, host=args.host, port=args.port, log_level="warning")

免费评分

参与人数 1热心值 +1 收起 理由
xiwangpl56 + 1 鼓励转贴优秀软件安全工具和文档!

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

沙发
xlinux 发表于 2026-5-2 16:07
有进展吗?
3#
zchld 发表于 2026-5-2 20:04
[Asm] 纯文本查看 复制代码
#!/usr/bin/env python3
"""
Android 手机 MCP 工具服务 (全功能版 · 官方 MCP SDK)
硬件控制 · 文件管理 · 传感器 · 通讯 · 应用操作 · 命令执行 · 通知监听
启动: python server.py --port 3000
在 AetherLink 中添加 SSE 端点: http://localhost:3000/sse
"""

import json
import os
import shutil
import subprocess
import time
import zipfile
import argparse
import signal
import sys
from pathlib import Path
from typing import Optional, List, Dict

try:
    from mcp.server.fastmcp import FastMCP
    import uvicorn
except ImportError:
    print("&#10060; 依赖缺失,请先安装:pip install mcp[fastmcp] uvicorn")
    sys.exit(1)

# 初始化 MCP 服务
mcp = FastMCP("Android Phone Controller")

# 全局退出标志
stop_server = False

def signal_handler(sig, frame):
    global stop_server
    print("\n&#128721; 正在停止 MCP 服务...")
    stop_server = True
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)

# ======================= 手机硬件与状态工具 =======================

@mcp.tool()
def get_battery_status() -> dict:
    """获取电池状态(电量、温度、充电状态等)"""
    try:
        result = subprocess.run(["termux-battery-status"], capture_output=True, text=True, check=True)
        return json.loads(result.stdout)
    except subprocess.CalledProcessError as e:
        return {"error": f"命令执行失败: {e.stderr.strip()}"}
    except FileNotFoundError:
        return {"error": "未安装 termux-api,请执行 pkg install termux-api"}

@mcp.tool()
def get_clipboard() -> str:
    """读取剪贴板内容"""
    try:
        result = subprocess.run(["termux-clipboard-get"], capture_output=True, text=True, check=True)
        return result.stdout.strip()
    except subprocess.CalledProcessError as e:
        return f"读取剪贴板失败: {e.stderr.strip()}"

@mcp.tool()
def set_clipboard(text: str) -> str:
    """设置剪贴板内容"""
    try:
        subprocess.run(["termux-clipboard-set", text], check=True, capture_output=True, text=True)
        return f"&#9989; 剪贴板已设置: {text}"
    except subprocess.CalledProcessError as e:
        return f"&#10060; 设置剪贴板失败: {e.stderr.strip()}"

@mcp.tool()
def take_photo(filename: str = "mcp_photo.jpg") -> str:
    """使用后置摄像头拍照"""
    try:
        subprocess.run(["termux-camera-photo", "-c", "0", filename], check=True, capture_output=True, text=True)
        return f"&#9989; 照片已保存: {filename}"
    except subprocess.CalledProcessError as e:
        return f"&#10060; 拍照失败: {e.stderr.strip()}"

@mcp.tool()
def list_sensors() -> list:
    """列出所有可用传感器"""
    try:
        result = subprocess.run(["termux-sensor", "-l"], capture_output=True, text=True, check=True)
        lines = result.stdout.strip().split("\n")
        return [line.strip() for line in lines if line.strip() and not line.startswith("Available")]
    except subprocess.CalledProcessError:
        return [{"error": "获取传感器列表失败"}]

@mcp.tool()
def get_sensor_data(sensor_name: str, delay_ms: int = 1000, limit: int = 1) -> list:
    """获取指定传感器数据"""
    try:
        delay_us = delay_ms * 1000
        timeout = limit * (delay_ms / 1000) + 3
        result = subprocess.run(
            ["termux-sensor", "-s", sensor_name, "-n", str(limit), "-d", str(delay_us)],
            capture_output=True, text=True, timeout=timeout
        )
        if result.returncode != 0:
            return [{"error": result.stderr.strip()}]
        data = []
        for line in result.stdout.strip().splitlines():
            try:
                data.append(json.loads(line))
            except json.JSONDecodeError:
                continue
        return data
    except subprocess.TimeoutExpired:
        return [{"error": "传感器读取超时"}]
    except Exception as e:
        return [{"error": str(e)}]

@mcp.tool()
def get_device_info() -> dict:
    """获取设备型号、系统版本信息"""
    info = {}
    props = {
        "model": "ro.product.model",
        "manufacturer": "ro.product.manufacturer",
        "android_version": "ro.build.version.release",
        "sdk_version": "ro.build.version.sdk"
    }
    for key, prop in props.items():
        try:
            info[key] = subprocess.run(["getprop", prop], capture_output=True, text=True).stdout.strip()
        except:
            info[key] = "unknown"
    return info

# ======================= 通知与提示工具 =======================

@mcp.tool()
def send_notification(title: str, content: str, sound: bool = True) -> dict:
    """发送手机通知"""
    try:
        cmd = ["termux-notification", "-t", title, "-c", content]
        if sound: cmd.append("--sound")
        result = subprocess.run(cmd, capture_output=True, text=True, check=True)
        return {"status": "success", "message": "通知已发送", "id": result.stdout.strip()}
    except:
        return {"status": "error", "message": "发送通知失败"}

@mcp.tool()
def remove_notification(notification_id: str) -> str:
    """移除指定ID的通知"""
    try:
        subprocess.run(["termux-notification-remove", notification_id], check=True, capture_output=True, text=True)
        return f"&#9989; 通知 {notification_id} 已移除"
    except:
        return "&#10060; 移除通知失败"

@mcp.tool()
def get_current_notifications() -> list:
    """获取当前所有通知(需开启通知权限)"""
    try:
        result = subprocess.run(["termux-notification-list"], capture_output=True, text=True, timeout=3)
        if not result.stdout:
            return []
        return json.loads(result.stdout)
    except json.JSONDecodeError:
        return [{"error": "请授予 Termux 通知使用权:设置 → 特殊权限 → 通知使用权"}]
    except:
        return [{"error": "获取通知失败"}]

@mcp.tool()
def show_toast(message: str, long: bool = False) -> str:
    """显示短时提示 Toast"""
    try:
        cmd = ["termux-toast"]
        if long: cmd.append("-l")
        cmd.append(message)
        subprocess.run(cmd, check=True, capture_output=True, text=True)
        return f"&#9989; 显示提示: {message}"
    except:
        return "&#10060; 显示提示失败"

# ======================= 网络与Wi-Fi工具 =======================

@mcp.tool()
def wifi_scan() -> list:
    """扫描周围Wi-Fi热点"""
    try:
        result = subprocess.run(["termux-wifi-scaninfo"], capture_output=True, text=True, check=True)
        return json.loads(result.stdout)
    except:
        return [{"error": "Wi-Fi 扫描失败"}]

@mcp.tool()
def get_wifi_connection_info() -> dict:
    """获取当前连接的Wi-Fi信息"""
    try:
        result = subprocess.run(["termux-wifi-connectioninfo"], capture_output=True, text=True, check=True)
        return json.loads(result.stdout)
    except:
        return {"error": "获取Wi-Fi信息失败"}

@mcp.tool()
def wifi_enable(state: bool) -> str:
    """开关Wi-Fi"""
    try:
        subprocess.run(["termux-wifi-enable", "true" if state else "false"], check=True, capture_output=True, text=True)
        return f"&#9989; Wi-Fi 已{'开启' if state else '关闭'}"
    except:
        return "&#10060; Wi-Fi 操作失败"

# ======================= 通话、短信、联系人 =======================

@mcp.tool()
def call_number(phone_number: str) -> str:
    """拨打电话(需手动接听)"""
    try:
        subprocess.run(["termux-telephony-call", phone_number], check=True, capture_output=True, text=True)
        return f"&#9989; 正在拨号: {phone_number}"
    except:
        return "&#10060; 拨号失败"

@mcp.tool()
def send_sms(phone_number: str, message: str) -> str:
    """发送短信"""
    try:
        subprocess.run(["termux-sms-send", "-n", phone_number, message], check=True, capture_output=True, text=True)
        return f"&#9989; 短信已发送至 {phone_number}"
    except:
        return "&#10060; 发送短信失败"

@mcp.tool()
def read_sms_inbox(limit: int = 10) -> list:
    """读取最近短信"""
    try:
        result = subprocess.run(["termux-sms-list", "-t", "inbox", "-l", str(limit)], capture_output=True, text=True, check=True)
        return json.loads(result.stdout)
    except:
        return [{"error": "读取短信失败"}]

@mcp.tool()
def list_contacts() -> list:
    """获取联系人列表"""
    try:
        result = subprocess.run(["termux-contact-list"], capture_output=True, text=True, check=True)
        contacts = []
        for line in result.stdout.strip().splitlines():
            if line:
                parts = line.split(",", 1)
                contacts.append({"name": parts[0].strip(), "phone": parts[1].strip() if len(parts) > 1 else ""})
        return contacts
    except:
        return [{"error": "获取联系人失败"}]

@mcp.tool()
def list_call_log(limit: int = 20) -> list:
    """获取通话记录"""
    try:
        result = subprocess.run(["termux-call-log", "-l", str(limit)], capture_output=True, text=True, check=True)
        return json.loads(result.stdout)
    except:
        return [{"error": "获取通话记录失败"}]

# ======================= 定位、传感器、手电筒、震动 =======================

@mcp.tool()
def get_location(provider: str = "gps") -> dict:
    """获取GPS定位"""
    try:
        result = subprocess.run(["termux-location", "-p", provider], capture_output=True, text=True, check=True)
        return json.loads(result.stdout)
    except:
        return {"error": "定位失败,请开启位置权限"}

@mcp.tool()
def toggle_torch(state: bool) -> str:
    """开关手电筒"""
    try:
        subprocess.run(["termux-torch", "on" if state else "off"], check=True, capture_output=True, text=True)
        return f"&#9989; 手电筒已{'打开' if state else '关闭'}"
    except:
        return "&#10060; 手电筒操作失败"

@mcp.tool()
def vibrate(duration_ms: int = 500) -> str:
    """手机震动"""
    try:
        subprocess.run(["termux-vibrate", "-d", str(duration_ms)], check=True, capture_output=True, text=True)
        return f"&#9989; 已震动 {duration_ms}ms"
    except:
        return "&#10060; 震动失败"

# ======================= 声音、亮度、媒体 =======================

@mcp.tool()
def set_volume(stream: str = "music", volume: int = 5) -> str:
    """设置音量(0-7)"""
    volume = max(0, min(7, volume))
    try:
        subprocess.run(["termux-volume", stream, str(volume)], check=True, capture_output=True, text=True)
        return f"&#9989; {stream} 音量设为 {volume}"
    except:
        return "&#10060; 设置音量失败"

@mcp.tool()
def set_brightness(brightness: int = 128) -> str:
    """设置屏幕亮度(0-255)"""
    brightness = max(0, min(255, brightness))
    try:
        subprocess.run(["termux-brightness", str(brightness)], check=True, capture_output=True, text=True)
        return f"&#9989; 亮度设为 {brightness}"
    except:
        return "&#10060; 设置亮度失败"

@mcp.tool()
def play_media(filepath: str) -> str:
    """播放音频/视频"""
    try:
        subprocess.Popen(["termux-media-player", "play", filepath])
        return f"&#9989; 开始播放: {filepath}"
    except:
        return "&#10060; 播放失败"

@mcp.tool()
def record_audio(filename: str = "recording.wav", duration_sec: int = 5, sample_rate: int = 44100) -> str:
    """录制麦克风声音"""
    try:
        subprocess.run(
            ["termux-microphone-record", "-f", filename, "-d", str(duration_sec), "-r", str(sample_rate)],
            check=True, capture_output=True, text=True
        )
        return f"&#9989; 录音完成: {filename}"
    except:
        return "&#10060; 录音失败"

# ======================= 文件系统工具(防中断终极版) =======================

@mcp.tool()
def list_directory(path: str = ".", detailed: bool = False) -> list:
    """列出目录内容"""
    try:
        p = Path(path).absolute()
        if not p.is_dir():
            return [{"error": f"不是目录: {path}"}]
        items = []
        for entry in p.iterdir():
            if detailed:
                st = entry.stat()
                items.append({
                    "name": entry.name,
                    "type": "dir" if entry.is_dir() else "file",
                    "size": st.st_size,
                    "modified": time.ctime(st.st_mtime)
                })
            else:
                items.append(entry.name + ("/" if entry.is_dir() else ""))
        return items
    except Exception as e:
        return [{"error": str(e)}]

@mcp.tool()
def read_file(path: str, max_lines: int = 500) -> str:
    """读取文本文件"""
    try:
        p = Path(path).absolute()
        if not p.is_file():
            return f"&#10060; 文件不存在: {path}"
        for enc in ["utf-8", "gbk", "latin-1"]:
            try:
                with open(p, "r", encoding=enc) as f:
                    lines = f.readlines()
                break
            except:
                continue
        else:
            return "&#10060; 无法读取(二进制文件)"
        if len(lines) > max_lines:
            lines = lines[:max_lines] + [f"\n... 仅显示前 {max_lines} 行"]
        return "".join(lines)
    except Exception as e:
        return f"&#10060; 读取失败: {str(e)}"

@mcp.tool()
def write_file(path: str, content: str, overwrite: bool = False) -> str:
    """写入文件(防中断 + 强制刷入磁盘)"""
    try:
        p = Path(path).absolute()
        if p.exists() and not overwrite:
            return "&#10060; 文件已存在,如需覆盖请设置 overwrite=True"
        
        with open(p, "w", encoding="utf-8", newline="") as f:
            f.write(content)
            f.flush()        # 刷新缓冲区
            os.fsync(f.fileno())  # 系统级强制写入磁盘(永不丢失)
        
        return f"&#9989; 已安全写入: {path}({len(content)} 字符)"
    except Exception as e:
        return f"&#10060; 写入失败: {str(e)}"

@mcp.tool()
def append_file(path: str, content: str) -> str:
    """追加文件(防中断 + 强制刷入磁盘)"""
    try:
        p = Path(path).absolute()
        
        with open(p, "a", encoding="utf-8") as f:
            f.write(content)
            f.flush()
            os.fsync(f.fileno())
        
        return f"&#9989; 已安全追加: {path}"
    except Exception as e:
        return f"&#10060; 追加失败: {str(e)}"

@mcp.tool()
def delete_file(path: str) -> str:
    """删除文件/目录(危险)"""
    try:
        p = Path(path).absolute()
        if not p.exists():
            return "&#10060; 路径不存在"
        if p.is_dir():
            shutil.rmtree(p)
        else:
            p.unlink()
        return f"&#9989; 已删除: {path}"
    except Exception as e:
        return f"&#10060; 删除失败: {str(e)}"

@mcp.tool()
def copy_file(source: str, destination: str) -> str:
    """复制文件或目录"""
    try:
        src = Path(source).absolute()
        dst = Path(destination).absolute()
        if not src.exists():
            return f"错误: 源路径不存在: {source}"
        if src.is_dir():
            shutil.copytree(src, dst)
        else:
            shutil.copy2(src, dst)
        return f"复制成功: {source} -> {destination}"
    except Exception as e:
        return f"复制失败: {str(e)}"

@mcp.tool()
def move_file(source: str, destination: str) -> str:
    """移动文件或目录"""
    try:
        src = Path(source).absolute()
        dst = Path(destination).absolute()
        if not src.exists():
            return f"错误: 源路径不存在: {source}"
        shutil.move(str(src), str(dst))
        return f"移动成功: {source} -> {destination}"
    except Exception as e:
        return f"移动失败: {str(e)}"

@mcp.tool()
def rename_file(path: str, new_name: str) -> str:
    """重命名文件或目录"""
    try:
        old = Path(path).absolute()
        if not old.exists():
            return f"错误: 路径不存在: {path}"
        new = old.parent / new_name
        old.rename(new)
        return f"重命名成功: {path} -> {new}"
    except Exception as e:
        return f"重命名失败: {str(e)}"

@mcp.tool()
def stat_file(path: str) -> dict:
    """查看文件或目录属性"""
    try:
        p = Path(path).absolute()
        if not p.exists():
            return {"error": f"路径不存在: {path}"}
        s = p.stat()
        return {
            "name": p.name,
            "path": str(p),
            "type": "目录" if p.is_dir() else "文件",
            "size_bytes": s.st_size,
            "permissions": oct(s.st_mode)[-3:],
            "last_modified": time.ctime(s.st_mtime),
            "last_accessed": time.ctime(s.st_atime),
            "is_symlink": p.is_symlink(),
        }
    except Exception as e:
        return {"error": str(e)}

@mcp.tool()
def chmod_file(path: str, mode: str) -> str:
    """修改文件权限(八进制,如 '755')"""
    try:
        target = Path(path).absolute()
        if not target.exists():
            return f"错误: 路径不存在: {path}"
        mask = int(mode, 8)
        os.chmod(target, mask)
        return f"权限已修改为 {mode}: {path}"
    except Exception as e:
        return f"修改权限失败: {str(e)}"

@mcp.tool()
def make_directory(path: str, parents: bool = True) -> str:
    """创建目录"""
    try:
        p = Path(path).absolute()
        if parents:
            p.mkdir(parents=True, exist_ok=True)
        else:
            p.mkdir()
        return f"目录已创建: {p}"
    except Exception as e:
        return f"创建目录失败: {str(e)}"

@mcp.tool()
def compress_files(source_path: str, zip_path: str) -> str:
    """压缩为 zip"""
    try:
        src = Path(source_path).absolute()
        dst = Path(zip_path).absolute()
        if not src.exists():
            return f"错误: 源路径不存在: {source_path}"
        with zipfile.ZipFile(dst, 'w', zipfile.ZIP_DEFLATED) as zf:
            if src.is_dir():
                for root, dirs, files in os.walk(src):
                    for f in files:
                        file_path = os.path.join(root, f)
                        arcname = os.relpath(file_path, src.parent)
                        zf.write(file_path, arcname)
            else:
                zf.write(src, src.name)
        return f"压缩成功: {source_path} -> {zip_path}"
    except Exception as e:
        return f"压缩失败: {str(e)}"

@mcp.tool()
def decompress_file(zip_path: str, destination: str = ".") -> str:
    """解压 zip"""
    try:
        zip_file = Path(zip_path).absolute()
        dest_dir = Path(destination).absolute()
        if not zip_file.is_file():
            return f"错误: zip文件不存在: {zip_path}"
        dest_dir.mkdir(parents=True, exist_ok=True)
        with zipfile.ZipFile(zip_file, 'r') as zf:
            zf.extractall(dest_dir)
        return f"解压完成: {zip_path} -> {destination}"
    except zipfile.BadZipFile:
        return "错误: 文件不是有效的 zip 格式"
    except Exception as e:
        return f"解压失败: {str(e)}"

@mcp.tool()
def upload_file(path: str, base64_content: str) -> str:
    """通过 Base64 上传文件(小文件)"""
    try:
        import base64
        file_path = Path(path).absolute()
        binary_data = base64.b64decode(base64_content)
        with open(file_path, 'wb') as f:
            f.write(binary_data)
            f.flush()
            os.fsync(f.fileno())
        return f"文件已上传: {path}({len(binary_data)} 字节)"
    except Exception as e:
        return f"上传失败: {str(e)}"

@mcp.tool()
def download_file(path: str) -> str:
    """下载文件为 Base64 编码(<10MB)"""
    try:
        import base64
        file_path = Path(path).absolute()
        if not file_path.is_file():
            return f"错误: 文件不存在: {path}"
        size = file_path.stat().st_size
        if size > 10 * 1024 * 1024:
            return f"错误: 文件过大({size} 字节),超过 10MB 限制"
        with open(file_path, 'rb') as f:
            data = f.read()
        return base64.b64encode(data).decode('utf-8')
    except Exception as e:
        return f"下载出错: {str(e)}"

# ======================= 高危命令执行 =======================

@mcp.tool()
def exec_termux_command(command: str, timeout_sec: int = 30) -> str:
    """
    &#9888;&#65039; 高危命令执行
    请谨慎使用,默认需要用户确认
    """
    timeout_sec = min(timeout_sec, 120)  # 最大放宽到120秒
    try:
        result = subprocess.run(
            command, shell=True, capture_output=True, text=True, timeout=timeout_sec
        )
        output = f"STDOUT:\n{result.stdout}\n\nSTDERR:\n{result.stderr}"
        if len(output) > 8000:
            output = output[:8000] + "\n...(输出过长已截断)"
        return output
    except subprocess.TimeoutExpired:
        return f"&#9200; 命令超时(>{timeout_sec}s)"
    except Exception as e:
        return f"&#10060; 执行失败: {str(e)}"

# ======================= 启动服务 =======================

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Termux Android MCP 控制服务")
    parser.add_argument("--port", type=int, default=3000, help="服务端口")
    parser.add_argument("--host", default="0.0.0.0", help="监听地址")
    args = parser.parse_args()

    print(f"""
&#128640; Termux 全能 MCP 服务已启动
&#128225; SSE 端点: http://{args.host}:{args.port}/sse
&#128295; 支持: 硬件/文件/通知/Wi-Fi/通话/短信/定位/命令
按 Ctrl+C 停止服务
    """)

    try:
        app = mcp.sse_app()
        uvicorn.run(app, host=args.host, port=args.port, log_level="warning")
    except Exception as e:
        print(f"&#10060; 服务启动失败: {e}")
4#
xxggla666 发表于 2026-5-2 22:30
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - 52pojie.cn ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2026-5-3 12:23

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表