[Python] 纯文本查看 复制代码
#!/usr/bin/env python3
"""
Android 手机 MCP 工具服务 (全功能版 · 官方 MCP SDK)
硬件控制 · 文件管理 · 传感器 · 通讯 · 应用操作 · 命令执行 · 通知监听
启动: python server.py --port 3000
在 AetherLink 中添加 SSE 端点: http://localhost:3000/sse
"""
import json
import os
import shutil
import subprocess
import time
import zipfile
import argparse
import signal
from pathlib import Path
from typing import Optional, List, Dict
from mcp.server.fastmcp import FastMCP # 官方 SDK 导入
import uvicorn
# 初始化 MCP 服务
mcp = FastMCP("Android Phone Controller")
# ======================= 原有手机控制工具 =======================
@mcp.tool()
def get_battery_status() -> dict:
"""获取电池状态(电量、温度、充电状态等)"""
try:
result = subprocess.run(["termux-battery-status"], capture_output=True, text=True, check=True)
return json.loads(result.stdout)
except subprocess.CalledProcessError as e:
return {"error": f"命令执行失败: {e.stderr}"}
except FileNotFoundError:
return {"error": "未安装 termux-api,请执行 pkg install termux-api"}
@mcp.tool()
def get_clipboard() -> str:
"""读取剪贴板内容"""
try:
result = subprocess.run(["termux-clipboard-get"], capture_output=True, text=True, check=True)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
return f"读取剪贴板失败: {e.stderr}"
@mcp.tool()
def set_clipboard(text: str) -> str:
"""设置剪贴板内容"""
try:
subprocess.run(["termux-clipboard-set", text], check=True, capture_output=True, text=True)
return f"剪贴板已设置为: {text}"
except subprocess.CalledProcessError as e:
return f"设置剪贴板失败: {e.stderr}"
@mcp.tool()
def take_photo(filename: str = "mcp_photo.jpg") -> str:
"""使用后置摄像头拍照(默认保存到当前目录)"""
try:
subprocess.run(["termux-camera-photo", "-c", "0", filename], check=True, capture_output=True, text=True)
return f"照片已保存为: {filename}"
except subprocess.CalledProcessError as e:
return f"拍照失败: {e.stderr}"
@mcp.tool()
def list_sensors() -> list:
"""列出所有可用传感器"""
try:
result = subprocess.run(["termux-sensor", "-l"], capture_output=True, text=True, check=True)
lines = result.stdout.strip().split("\n")
sensors = [line.strip() for line in lines if line.strip() and not line.startswith("Available")]
return sensors
except subprocess.CalledProcessError as e:
return [f"获取传感器列表失败: {e.stderr}"]
@mcp.tool()
def get_sensor_data(sensor_name: str, delay_ms: int = 1000, limit: int = 1) -> list[dict]:
"""获取指定传感器数据(默认读取一次,延时1000ms)"""
try:
delay_us = delay_ms * 1000
result = subprocess.run(
["termux-sensor", "-s", sensor_name, "-n", str(limit), "-d", str(delay_us)],
capture_output=True, text=True, timeout=(limit * (delay_ms / 1000) + 5)
)
if result.returncode != 0:
return [{"error": result.stderr.strip()}]
lines = result.stdout.strip().split("\n")
data = []
for line in lines:
try:
data.append(json.loads(line))
except json.JSONDecodeError:
pass
return data
except subprocess.TimeoutExpired:
return [{"error": "获取传感器数据超时"}]
except subprocess.CalledProcessError as e:
return [{"error": f"命令执行失败: {e.stderr}"}]
@mcp.tool()
def get_device_info() -> dict:
"""获取安卓设备基本信息(型号、Android版本、SDK等)"""
info = {}
try:
info["model"] = subprocess.run(["getprop", "ro.product.model"], capture_output=True, text=True).stdout.strip()
except:
info["model"] = "unknown"
try:
info["manufacturer"] = subprocess.run(["getprop", "ro.product.manufacturer"], capture_output=True, text=True).stdout.strip()
except:
info["manufacturer"] = "unknown"
try:
info["android_version"] = subprocess.run(["getprop", "ro.build.version.release"], capture_output=True, text=True).stdout.strip()
except:
info["android_version"] = "unknown"
try:
info["sdk_version"] = subprocess.run(["getprop", "ro.build.version.sdk"], capture_output=True, text=True).stdout.strip()
except:
info["sdk_version"] = "unknown"
return info
@mcp.tool()
def send_notification(title: str, content: str, sound: bool = True) -> dict:
"""在手机上发送通知,返回通知ID用于后续移除"""
try:
cmd = ["termux-notification", "-t", title, "-c", content]
if sound:
cmd.append("--sound")
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
notif_id = result.stdout.strip()
return {"message": "通知已发送", "id": notif_id}
except subprocess.CalledProcessError as e:
return {"error": f"发送通知失败: {e.stderr}"}
except FileNotFoundError:
return {"error": "未安装 termux-api"}
@mcp.tool()
def wifi_scan() -> list[dict]:
"""扫描周边 Wi-Fi 热点"""
try:
result = subprocess.run(["termux-wifi-scaninfo"], capture_output=True, text=True, check=True)
return json.loads(result.stdout)
except subprocess.CalledProcessError as e:
return [{"error": f"扫描失败: {e.stderr}"}]
except json.JSONDecodeError:
return [{"error": "无法解析 Wi-Fi 扫描结果"}]
@mcp.tool()
def vibrate(duration_ms: int = 500) -> str:
"""让手机震动指定毫秒"""
try:
subprocess.run(["termux-vibrate", "-d", str(duration_ms)], check=True, capture_output=True, text=True)
return f"已震动 {duration_ms} 毫秒"
except subprocess.CalledProcessError as e:
return f"震动失败: {e.stderr}"
except FileNotFoundError:
return "未安装 termux-api"
# ======================= 新增 Termux:API 工具 =======================
@mcp.tool()
def call_number(phone_number: str) -> str:
"""拨打指定电话号码(需手动确认,AI 慎用)"""
try:
subprocess.run(["termux-telephony-call", phone_number], check=True, capture_output=True, text=True)
return f"正在拨号: {phone_number}"
except subprocess.CalledProcessError as e:
return f"拨号失败: {e.stderr}"
@mcp.tool()
def send_sms(phone_number: str, message: str) -> str:
"""发送短信到指定号码(AI 慎用)"""
try:
subprocess.run(["termux-sms-send", "-n", phone_number, message], check=True, capture_output=True, text=True)
return f"短信已发送至 {phone_number}"
except subprocess.CalledProcessError as e:
return f"发送短信失败: {e.stderr}"
@mcp.tool()
def read_sms_inbox(limit: int = 10) -> list[dict]:
"""读取手机短信收件箱最近的若干条"""
try:
result = subprocess.run(
["termux-sms-list", "-t", "inbox", "-l", str(limit)],
capture_output=True, text=True, check=True
)
return json.loads(result.stdout)
except subprocess.CalledProcessError as e:
return [{"error": f"读取短信失败: {e.stderr}"}]
@mcp.tool()
def list_contacts() -> list[dict]:
"""列出所有联系人姓名和号码"""
try:
result = subprocess.run(["termux-contact-list"], capture_output=True, text=True, check=True)
lines = result.stdout.strip().split("\n")
contacts = []
for line in lines:
if line:
parts = line.split(",", 1)
contacts.append({"name": parts[0].strip(), "phone": parts[1].strip() if len(parts) > 1 else ""})
return contacts
except subprocess.CalledProcessError as e:
return [{"error": f"获取联系人失败: {e.stderr}"}]
@mcp.tool()
def list_call_log(limit: int = 20) -> list[dict]:
"""读取最近的通话记录"""
try:
result = subprocess.run(
["termux-call-log", "-l", str(limit)],
capture_output=True, text=True, check=True
)
return json.loads(result.stdout)
except subprocess.CalledProcessError as e:
return [{"error": f"获取通话记录失败: {e.stderr}"}]
@mcp.tool()
def get_cell_info() -> dict:
"""获取移动网络基站信息"""
try:
result = subprocess.run(["termux-telephony-cellinfo"], capture_output=True, text=True, check=True)
return json.loads(result.stdout)
except subprocess.CalledProcessError as e:
return {"error": f"获取基站信息失败: {e.stderr}"}
@mcp.tool()
def get_telephony_device_info() -> dict:
"""获取设备 IMEI、网络类型等电话相关信息"""
try:
result = subprocess.run(["termux-telephony-deviceinfo"], capture_output=True, text=True, check=True)
return json.loads(result.stdout)
except subprocess.CalledProcessError as e:
return {"error": f"获取电话设备信息失败: {e.stderr}"}
@mcp.tool()
def get_location(provider: str = "gps") -> dict:
"""获取设备当前 GPS 定位 (provider: gps/network/passive)"""
try:
result = subprocess.run(
["termux-location", "-p", provider],
capture_output=True, text=True, check=True
)
return json.loads(result.stdout)
except subprocess.CalledProcessError as e:
return {"error": f"获取定位失败: {e.stderr}"}
@mcp.tool()
def get_camera_info() -> dict:
"""获取摄像头参数(支持的分辨率、闪光灯等)"""
try:
result = subprocess.run(["termux-camera-info"], capture_output=True, text=True, check=True)
return json.loads(result.stdout)
except subprocess.CalledProcessError as e:
return {"error": f"获取摄像头信息失败: {e.stderr}"}
@mcp.tool()
def record_audio(filename: str = "recording.wav", duration_sec: int = 5, sample_rate: int = 44100) -> str:
"""使用麦克风录制音频(WAV格式)"""
try:
subprocess.run(
["termux-microphone-record", "-f", filename, "-d", str(duration_sec), "-r", str(sample_rate)],
check=True, capture_output=True, text=True
)
return f"录音完成,保存为: {filename},时长{duration_sec}秒"
except subprocess.CalledProcessError as e:
return f"录音失败: {e.stderr}"
@mcp.tool()
def play_media(filepath: str) -> str:
"""播放指定的媒体文件(音频/视频)"""
try:
subprocess.Popen(["termux-media-player", "play", filepath])
return f"开始播放: {filepath}"
except Exception as e:
return f"播放失败: {str(e)}"
@mcp.tool()
def set_volume(stream: str = "music", volume: int = 5) -> str:
"""调整音量,stream: music/ring/alarm/notification,volume: 0-7"""
try:
subprocess.run(["termux-volume", stream, str(volume)], check=True, capture_output=True, text=True)
return f"{stream} 音量已设置为 {volume}"
except subprocess.CalledProcessError as e:
return f"调整音量失败: {e.stderr}"
@mcp.tool()
def set_brightness(brightness: int = 128) -> str:
"""设置屏幕亮度 (0-255)"""
try:
subprocess.run(["termux-brightness", str(brightness)], check=True, capture_output=True, text=True)
return f"屏幕亮度已设置为 {brightness}"
except subprocess.CalledProcessError as e:
return f"调整亮度失败: {e.stderr}"
@mcp.tool()
def show_toast(message: str, long: bool = False) -> str:
"""在屏幕上显示短暂的 Toast 提示"""
try:
cmd = ["termux-toast"]
if long:
cmd.append("-l")
cmd.append(message)
subprocess.run(cmd, check=True, capture_output=True, text=True)
return f"Toast 已显示: {message}"
except subprocess.CalledProcessError as e:
return f"显示 Toast 失败: {e.stderr}"
@mcp.tool()
def share_content(content: str, action: str = "send") -> str:
"""将文本或文件路径分享到其他 App。action 可选: send(发送), view(查看)"""
try:
subprocess.run(["termux-share", "-a", action, content], check=True, capture_output=True, text=True)
return f"已发起分享: {content}"
except subprocess.CalledProcessError as e:
return f"分享失败: {e.stderr}"
@mcp.tool()
def download_file_url(url: str, description: str = "", title: str = "") -> str:
"""通过系统下载管理器下载文件"""
try:
cmd = ["termux-download", url]
if description:
cmd += ["-d", description]
if title:
cmd += ["-t", title]
subprocess.run(cmd, check=True, capture_output=True, text=True)
return f"下载任务已添加: {url}"
except subprocess.CalledProcessError as e:
return f"添加下载失败: {e.stderr}"
@mcp.tool()
def toggle_torch(state: bool) -> str:
"""开关手电筒 (True 开, False 关)"""
try:
cmd = ["termux-torch"]
if state:
cmd.append("on")
else:
cmd.append("off")
subprocess.run(cmd, check=True, capture_output=True, text=True)
return f"手电筒已{'打开' if state else '关闭'}"
except subprocess.CalledProcessError as e:
return f"控制手电筒失败: {e.stderr}"
@mcp.tool()
def wifi_enable(state: bool) -> str:
"""开关 Wi-Fi (True 打开, False 关闭)"""
try:
cmd = ["termux-wifi-enable", "true" if state else "false"]
subprocess.run(cmd, check=True, capture_output=True, text=True)
return f"Wi-Fi 已{'开启' if state else '关闭'}"
except subprocess.CalledProcessError as e:
return f"操作失败: {e.stderr}"
@mcp.tool()
def get_wifi_connection_info() -> dict:
"""获取当前 Wi-Fi 连接的 SSID、BSSID、信号强度等信息"""
try:
result = subprocess.run(["termux-wifi-connectioninfo"], capture_output=True, text=True, check=True)
return json.loads(result.stdout)
except subprocess.CalledProcessError as e:
return {"error": f"获取 WiFi 信息失败: {e.stderr}"}
@mcp.tool()
def storage_get_file(file_type: str = "text/plain") -> str:
"""通过系统文件选择器获取文件(需 GUI 环境)"""
try:
result = subprocess.run(
["termux-storage-get", file_type],
capture_output=True, text=True, check=True
)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
return f"获取文件失败: {e.stderr} (可能需要在图形界面下运行)"
@mcp.tool()
def remove_notification(notification_id: str) -> str:
"""根据通知ID移除已显示的通知"""
try:
subprocess.run(["termux-notification-remove", notification_id], check=True, capture_output=True, text=True)
return f"通知 {notification_id} 已移除"
except subprocess.CalledProcessError as e:
return f"移除通知失败: {e.stderr}"
# ======================= 通知读取工具(一次性快照修复版) =======================
@mcp.tool()
def get_current_notifications(wait_sec: float = 1.5) -> list[dict]:
"""
获取当前系统通知栏中的所有通知(一次性快照)。
直接调用 termux-notification-list,等待指定秒数后返回解析后的通知列表。
注意:需要在系统设置中授予 Termux 通知使用权(设置 → 通知使用权 → 允许 Termux)。
"""
try:
result = subprocess.run(
["termux-notification-list"],
capture_output=True,
text=True,
timeout=max(wait_sec, 1.0)
)
if result.stdout:
try:
notifications = json.loads(result.stdout)
except json.JSONDecodeError:
# 输出不是合法 JSON,可能是权限错误或空输出,检查 stderr
if result.stderr and ("permission" in result.stderr.lower() or "access" in result.stderr.lower()):
return [{"error": "通知权限未授予,请前往设置 → 应用 → 特殊应用权限 → 通知使用权,为 Termux 开启权限"}]
return []
# 检查返回的数组中是否包含权限错误信息
if isinstance(notifications, list):
for n in notifications:
if isinstance(n, dict) and "error" in n and "通知使用" in n.get("error", ""):
return [{"error": "通知权限未授予,请前往设置 → 应用 → 特殊应用权限 → 通知使用权,为 Termux 开启权限"}]
return notifications
else:
# 返回的不是数组,可能是单个对象,直接包装
return [notifications] if isinstance(notifications, dict) else []
return []
except subprocess.TimeoutExpired:
return [{"error": f"等待超时({wait_sec}秒),未获得通知数据"}]
except FileNotFoundError:
return [{"error": "termux-api 未正确安装,请执行 pkg install termux-api"}]
except Exception as e:
return [{"error": str(e)}]
# ======================= 文件操作工具 (Python 标准库) =======================
@mcp.tool()
def list_directory(path: str, detailed: bool = False) -> list:
"""列出指定目录内容(绝对路径)"""
try:
p = Path(path).absolute()
if not p.is_dir():
return [{"error": f"路径不是目录: {path}"}]
items = []
for entry in p.iterdir():
if detailed:
st = entry.stat()
items.append({
"name": entry.name,
"type": "dir" if entry.is_dir() else "file",
"size": st.st_size,
"modified": time.ctime(st.st_mtime),
"permissions": oct(st.st_mode)[-3:]
})
else:
items.append(entry.name + ("/" if entry.is_dir() else ""))
return items
except Exception as e:
return [{"error": str(e)}]
@mcp.tool()
def read_file(path: str, max_lines: int = 500) -> str:
"""读取文本文件内容(自动检测编码)"""
try:
file_path = Path(path).absolute()
if not file_path.is_file():
return f"错误: 文件不存在: {path}"
encodings = ['utf-8', 'latin-1', 'gbk']
for enc in encodings:
try:
with open(file_path, 'r', encoding=enc) as f:
lines = f.readlines()
break
except UnicodeDecodeError:
continue
else:
return "错误: 无法以文本格式读取该文件(可能是二进制文件)"
if max_lines and len(lines) > max_lines:
lines = lines[:max_lines]
lines.append(f"\n[... 仅显示前 {max_lines} 行 ...]")
return ''.join(lines)
except Exception as e:
return f"读取文件出错: {str(e)}"
@mcp.tool()
def create_file(path: str) -> str:
"""创建空文件(已存在则报错)"""
try:
file_path = Path(path).absolute()
if file_path.exists():
return f"错误: 文件或目录已存在: {path}"
file_path.touch()
return f"文件已创建: {file_path}"
except Exception as e:
return f"创建文件失败: {str(e)}"
@mcp.tool()
def write_file(path: str, content: str, overwrite: bool = False) -> str:
"""写入内容到文件(默认不覆盖)"""
try:
file_path = Path(path).absolute()
if file_path.exists() and not overwrite:
return f"错误: 文件已存在,如需覆盖请设置 overwrite=True"
mode = 'w' if overwrite or not file_path.exists() else 'x'
with open(file_path, mode, encoding='utf-8') as f:
f.write(content)
return f"内容已写入: {file_path}"
except Exception as e:
return f"写入文件失败: {str(e)}"
@mcp.tool()
def append_file(path: str, content: str) -> str:
"""向文件末尾追加内容"""
try:
file_path = Path(path).absolute()
with open(file_path, 'a', encoding='utf-8') as f:
f.write(content)
return f"内容已追加到: {file_path}"
except Exception as e:
return f"追加文件失败: {str(e)}"
@mcp.tool()
def copy_file(source: str, destination: str) -> str:
"""复制文件或目录"""
try:
src = Path(source).absolute()
dst = Path(destination).absolute()
if not src.exists():
return f"错误: 源路径不存在: {source}"
if src.is_dir():
shutil.copytree(src, dst)
else:
shutil.copy2(src, dst)
return f"复制成功: {source} -> {destination}"
except Exception as e:
return f"复制失败: {str(e)}"
@mcp.tool()
def move_file(source: str, destination: str) -> str:
"""移动文件或目录"""
try:
src = Path(source).absolute()
dst = Path(destination).absolute()
if not src.exists():
return f"错误: 源路径不存在: {source}"
shutil.move(str(src), str(dst))
return f"移动成功: {source} -> {destination}"
except Exception as e:
return f"移动失败: {str(e)}"
@mcp.tool()
def delete_file(path: str) -> str:
"""删除文件或目录(危险操作,不可恢复)"""
try:
target = Path(path).absolute()
if not target.exists():
return f"错误: 路径不存在: {path}"
if target.is_dir():
shutil.rmtree(target)
else:
target.unlink()
return f"已删除: {path}"
except Exception as e:
return f"删除失败: {str(e)}"
@mcp.tool()
def rename_file(path: str, new_name: str) -> str:
"""重命名文件或目录"""
try:
old = Path(path).absolute()
if not old.exists():
return f"错误: 路径不存在: {path}"
new = old.parent / new_name
old.rename(new)
return f"重命名成功: {path} -> {new}"
except Exception as e:
return f"重命名失败: {str(e)}"
@mcp.tool()
def stat_file(path: str) -> dict:
"""查看文件或目录属性"""
try:
p = Path(path).absolute()
if not p.exists():
return {"error": f"路径不存在: {path}"}
s = p.stat()
return {
"name": p.name,
"path": str(p),
"type": "目录" if p.is_dir() else "文件",
"size_bytes": s.st_size,
"permissions": oct(s.st_mode)[-3:],
"last_modified": time.ctime(s.st_mtime),
"last_accessed": time.ctime(s.st_atime),
"is_symlink": p.is_symlink(),
}
except Exception as e:
return {"error": str(e)}
@mcp.tool()
def chmod_file(path: str, mode: str) -> str:
"""修改文件权限(八进制,如 '755')"""
try:
target = Path(path).absolute()
if not target.exists():
return f"错误: 路径不存在: {path}"
mask = int(mode, 8)
os.chmod(target, mask)
return f"权限已修改为 {mode}: {path}"
except Exception as e:
return f"修改权限失败: {str(e)}"
@mcp.tool()
def make_directory(path: str, parents: bool = True) -> str:
"""创建目录"""
try:
p = Path(path).absolute()
if parents:
p.mkdir(parents=True, exist_ok=True)
else:
p.mkdir()
return f"目录已创建: {p}"
except Exception as e:
return f"创建目录失败: {str(e)}"
@mcp.tool()
def compress_files(source_path: str, zip_path: str) -> str:
"""压缩为 zip"""
try:
src = Path(source_path).absolute()
dst = Path(zip_path).absolute()
if not src.exists():
return f"错误: 源路径不存在: {source_path}"
with zipfile.ZipFile(dst, 'w', zipfile.ZIP_DEFLATED) as zf:
if src.is_dir():
for root, dirs, files in os.walk(src):
for f in files:
file_path = os.path.join(root, f)
arcname = os.path.relpath(file_path, src.parent)
zf.write(file_path, arcname)
else:
zf.write(src, src.name)
return f"压缩成功: {source_path} -> {zip_path}"
except Exception as e:
return f"压缩失败: {str(e)}"
@mcp.tool()
def decompress_file(zip_path: str, destination: str = ".") -> str:
"""解压 zip"""
try:
zip_file = Path(zip_path).absolute()
dest_dir = Path(destination).absolute()
if not zip_file.is_file():
return f"错误: zip文件不存在: {zip_path}"
dest_dir.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(zip_file, 'r') as zf:
zf.extractall(dest_dir)
return f"解压完成: {zip_path} -> {destination}"
except zipfile.BadZipFile:
return "错误: 文件不是有效的 zip 格式"
except Exception as e:
return f"解压失败: {str(e)}"
@mcp.tool()
def upload_file(path: str, base64_content: str) -> str:
"""通过 Base64 上传文件(小文件)"""
try:
import base64
file_path = Path(path).absolute()
binary_data = base64.b64decode(base64_content)
with open(file_path, 'wb') as f:
f.write(binary_data)
return f"文件已上传: {path}({len(binary_data)} 字节)"
except Exception as e:
return f"上传失败: {str(e)}"
@mcp.tool()
def download_file(path: str) -> str:
"""下载文件为 Base64 编码(<10MB)"""
try:
import base64
file_path = Path(path).absolute()
if not file_path.is_file():
return f"错误: 文件不存在: {path}"
size = file_path.stat().st_size
if size > 10 * 1024 * 1024:
return f"错误: 文件过大({size} 字节),超过 10MB 限制"
with open(file_path, 'rb') as f:
data = f.read()
return base64.b64encode(data).decode('utf-8')
except Exception as e:
return f"下载出错: {str(e)}"
# ======================= 命令行工具(高风险,智能权限) =======================
@mcp.tool()
def exec_termux_command(command: str, timeout_sec: int = 30) -> str:
"""
⚠️ 权限控制规则:
1. 每次执行前必须询问,提供4个选项:
- “仅本次允许”:仅此一条
- “一直允许”:后续不再询问,直接执行
- “拒绝”:不执行
- “自定义授权”:用户当场口述任意规则(如次数、目录、命令白名单、时间限制、任务范围等),后续命令直接按规则执行,不再询问。额度用完/用户取消则自动恢复询问。
2. 选择“一直允许”后,后续无需再问。
3. 选择“仅本次允许”或“拒绝”后,下次仍需重新询问。
4. 选择“自定义授权”或者用户有自定义意向的,请遵守自定义规则。
5. 未经明确表态,禁止执行。
参数:
command: 要执行的 shell 命令(字符串)
timeout_sec: 超时秒数(最大 60 秒)
"""
timeout_sec = min(timeout_sec, 60) # 安全上限
try:
result = subprocess.run(
command, shell=True, capture_output=True, text=True, timeout=timeout_sec
)
output = f"STDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}"
if len(output) > 8000:
output = output[:8000] + "\n...(输出已截断)"
return output
except subprocess.TimeoutExpired:
return f"命令执行超时(>{timeout_sec}秒),已强制终止: {command}"
except Exception as e:
return f"执行命令时发生异常: {str(e)}"
# ======================= 启动服务 =======================
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Termux 全能 MCP Server")
parser.add_argument("--port", type=int, default=3000, help="服务端口")
parser.add_argument("--host", type=str, default="0.0.0.0", help="监听地址")
args = parser.parse_args()
print(f"""
╔══════════════════════════════════════════════╗
║ 📱 Termux 全能 MCP Server (官方SDK版) 已启动 ║
║ ║
║ SSE 端点: http://{args.host}:{args.port}/sse ║
║ 消息端点: http://{args.host}:{args.port}/messages/ ║
║ ║
║ 已加载工具: 电池/传感器/文件/通讯/命令… ║
║ 按 Ctrl+C 停止服务 ║
╚══════════════════════════════════════════════╝
""")
app = mcp.sse_app()
uvicorn.run(app, host=args.host, port=args.port, log_level="warning")