AIcard 项目开发手册(预览版-大部分尚未实现,骨架模块都已完善)
[b]保持创意、一个想法,一个逻辑的步骤。本工具不再更新,骨架因为已经实现,自定义方法靠使用者的创意,继续加油补全功能,根据心中所想自由定义一种高效的流程定义
目前浏览器开发注意控制台日志:投入正常使用记得注释!
万级客户端需要中继节点,共享WS流,转发心跳(能者加油)!
AIcard-p.rar
(151.22 KB, 下载次数: 60)
含油猴远端注入脚本-实际手册(补).rar
(21.36 KB, 下载次数: 16)
注入方法
以下是MiniMax 总结的所有历程和细节。
版本:v0.1.0-preview
最后更新:2026年2月
贡献者(按模块署名)
| 模块 |
贡献者 |
职责 |
| DeepSeek |
DeepSeek |
SSE拦截、流解析、自动发送 |
| 豆包 |
豆包 |
SSE拦截、流解析、自动发送 |
| 腾讯元宝 |
腾讯元宝 |
SSE拦截、流解析、自动发送 |
| 通义千问 |
通义千问 |
SSE拦截、流解析、自动发送 |
| 服务端 |
服务端 |
WebSocket通信、命令解析、心跳维持 |
| 流控模块 |
流控模块 |
流程状态管理、自动发送判断 |
| 文档 |
MiniMax |
手册编写与维护 |
目录
| 章节 |
标题 |
内容概要 |
| 一 |
项目概述 |
系统定位、核心设计理念 |
| 二 |
项目目录结构 |
客户端、服务端目录说明 |
| 三 |
架构设计 |
整体架构、适配器设计、流控机制 |
| 四 |
WebSocket 通信协议 |
连接端点、消息类型、心跳机制 |
| 五 |
命令解析系统 |
命令格式、解析器、处理器 |
| 六 |
SSE 流拦截机制 |
拦截器设计、各平台实现 |
| 七 |
自动发送机制 |
输入模拟、按钮触发 |
| 八 |
平台模块详解 |
DeepSeek、豆包、元宝、千问 |
| 九 |
流控上报机制 |
FlowRole、数据结构、状态管理 |
| 十 |
服务端模块 |
WebSocket 处理器、命令解析 |
| 十一 |
客户端核心模块 |
入口、配置、网络通信 |
| 十二 |
使用指南 |
快速启动、角色卡片、测试方法 |
| 十三 |
命令速查表 |
所有命令格式与示例 |
| 十四 |
高级主题 |
AI Manager、MCP 拓展、技术债务 |
| 十五 |
核心愿景与待完善大方向 |
AI Manager Agent、并行串行架构、性能优势 |
一、项目概述
AIcard 是一个多平台 AI 对话助手自动化注入系统,支持在 DeepSeek、豆包、腾讯元宝、通义千问等主流 AI 聊天平台上自动注入脚本,实现对话内容拦截、命令解析执行、自动回复等功能。系统采用前后端分离架构,前端通过浏览器注入的方式拦截各平台的 SSE 流式响应,后端提供 WebSocket 实时通信和命令解析服务。
项目的核心设计理念是将不同平台的对话接口统一抽象,通过适配器模式处理各平台差异化的 SSE 格式,同时建立统一的命令解析机制,支持在对话过程中执行系统命令并将结果自动发送回对话上下文。这种设计使得系统具有良好的扩展性,新增平台支持只需编写对应的 SSE 适配器即可。
二、项目目录结构
2.1 客户端目录结构
客户端代码位于 client-in-JS-Dev/src 目录下,采用模块化组织方式。核心入口文件为 core/entry.ts,负责根据当前访问的域名自动加载对应的平台模块。模块目录 modules/ 下包含四个主要平台的实现,每个平台通常包含三个核心文件:index.ts 作为模块入口负责初始化和事件监听,sse-adapter.ts 负责拦截和解析 SSE 流式响应,auto-send.ts 负责模拟用户输入和自动发送消息。
网络通信模块位于 net/ 目录下,sws-client.ts 实现 WebSocket 单例客户端,提供消息发送和监听功能;flowrole.ts 实现流控上报模块,负责将 SSE 流状态同步到服务端;sse.ts 提供通用的 XHR/Fetch 拦截器注册机制。配置文件 config/index.ts 集中管理各类连接参数和选项。
client-in-JS-Dev/
├── src/
│ ├── core/
│ │ └── entry.ts # 入口文件,自动检测平台并加载模块
│ ├── modules/
│ │ ├── deepseek/ # DeepSeek 平台实现
│ │ │ ├── index.ts # 模块入口
│ │ │ ├── sse-adapter.ts # SSE 拦截器
│ │ │ └── auto-send.ts # 自动发送实现
│ │ ├── doubao/ # 豆包平台实现
│ │ ├── yuanbao/ # 腾讯元宝平台实现
│ │ └── qianwen/ # 通义千问平台实现
│ ├── net/
│ │ ├── sws-client.ts # WebSocket 客户端
│ │ ├── flowrole.ts # 流控上报
│ │ └── sse.ts # SSE 拦截器注册
│ └── config/
│ └── index.ts # 配置文件
├── dist/ # 构建输出目录
└── role-cards/ # 角色卡片配置
2.2 服务端目录结构
服务端代码位于 server/ 目录下,采用 Python 实现。main.py 是启动入口,同时启动 HTTP 静态文件服务器和 WebSocket 服务器。router.py 负责 WebSocket 路由分发,将不同路径的连接分发到对应的处理器。
handlers/ 目录包含 WebSocket 消息处理器,realtime.py 是核心处理器,负责接收前端上报的对话数据、解析命令、返回执行结果;heartbeat.py 提供心跳维持功能。command_parser/ 目录实现命令解析系统,采用流式累积判断器模式,通过 patterns.json 定义命令格式,handlers/ 目录下实现具体的处理器。
flowrole/ 目录实现流控抽象类,包含 SSE 流控角色、命令解析流控角色、自动发送流控角色等,用于智能判断是否应该触发自动发送。integrations/ 目录包含与客户端流控模块的集成实现。
server/
├── main.py # 服务启动入口
├── router.py # WebSocket 路由
├── http_entry.py # HTTP 服务器
├── handlers/
│ ├── realtime.py # 实时消息处理
│ └── heartbeat.py # 心跳处理
├── command_parser/
│ ├── parser.py # 命令解析器
│ ├── patterns.json # 命令格式配置
│ └── handlers/
│ └── noshow.py # 静默执行处理器
└── flowrole/
├── __init__.py # 流控核心类
└── integrations/
└── sse_handler.py # SSE 流控集成
三、架构设计
3.1 整体架构
系统采用分层架构设计,从上到下依次为:平台适配层、消息处理层、通信传输层、命令解析层。平台适配层负责对接各个 AI 对话平台,拦截并解析其 SSE 流式响应;消息处理层负责数据的标准化处理和流转;通信传输层通过 WebSocket 实现前后端实时双向通信;命令解析层提供命令识别和执行能力。
各层之间通过事件机制进行通信,当 SSE 适配器捕获到新数据时,会通过 RealtimeClient 发送到服务端;服务端处理完成后返回结果,前端各模块通过监听 WebSocket 消息来接收和处理这些结果。这种松耦合的设计使得各模块可以独立开发和测试,只需遵循约定的消息格式即可。
3.2 平台适配器设计
每个平台的 SSE 适配器都遵循统一的设计模式:以单例模式实现,通过拦截 XHR 或 Fetch 来获取流式响应数据。适配器内部维护状态跟踪变量,用于检测增量内容、处理流结束事件。拦截器通过 registerInterceptor 函数注册,由 sse.ts 提供的统一框架管理。
适配器的主要职责包括:识别目标平台的 SSE 端点 URL、解析特定格式的 SSE 事件、提取文本增量内容、维护流状态信息、将数据发送到后端。不同平台的差异主要体现在 SSE 格式上,例如 DeepSeek 使用 data: 前缀加 JSON 格式、豆包使用 event: 和 data: 分行格式、通义千问使用 event: 和 data: 伴随 JSON 数据等。
3.3 流控上报机制
流控上报模块负责将 SSE 流的生命周期事件同步到服务端,使服务端能够智能判断是否应该触发自动回复。模块通过 flowRoleReporter 单例实现,配置了各平台的 SSE 事件映射关系。
当检测到 SSE 流开始时,上报 flow_start 事件,携带 message_id 用于追踪;流传输过程中上报 status_update 事件,携带 quasi_status、status、fragment_count 等状态信息;流结束时上报 flow_end 事件。服务端根据这些信息结合 should_auto_send() 逻辑判断是否需要自动回复。
四、WebSocket 通信协议
4.1 连接端点
系统提供两个 WebSocket 端点:ws://localhost:8027/realtime 用于实时消息传输,ws://localhost:8027/heartbeat 用于心跳维持。客户端通过 RealtimeClient 单例连接 realtime 端点,通过 HeartbeatClient 连接 heartbeat 端点。
服务端 router.py 根据 WebSocket 连接的路径将请求分发到对应的处理器。realtime 处理器处理所有与对话相关的信息交互,包括接收前端数据、解析命令、返回结果等;heartbeat 处理器定期发送心跳包维持连接。
4.2 客户端发送消息类型
客户端通过 RealtimeClient.send() 向上发送消息,主要包含以下类型:
对话数据消息:当 SSE 适配器捕获到新的对话内容时,发送 { type: 'deepseek_data', data: '文本内容', timestamp: 13... } 格式的消息。data 字段为提取出的文本增量内容,timestamp 为捕获时间戳,instanceId 标识来源适配器。
流状态消息:当流开始、结束或状态更新时,通过 flow_report 类型上报:{ type: 'flow_report', flow_id: 'sse_xxx', message_id: 123, site: 'deepseek', quasi_status: 'FINISHED', status: 'complete' }。flow_id 是服务端生成的流程标识,message_id 是各平台的消息标识,quasi_status 和 status 反映流的完成状态。
心跳消息:客户端主动发送的心跳包,用于维持连接活跃状态。
4.3 服务端返回消息类型
服务端通过 handlers/realtime.py 向客户端发送消息,主要类型包括:
初始化消息:客户端首次连接时收到 { type: 'init', script_url: 'http://localhost:8028/entry.js', message: '连接成功' },告知客户端脚本 URL。
命令执行结果:当解析并执行命令后,返回 { type: 'cmd_result', result: { success: true, output: '命令输出' }, command: 'echo hello' }。result.success 表示是否执行成功,result.output 是命令的标准输出,command 是原始命令内容。
确认消息:收到客户端消息后回复 { type: 'ack', received: '消息类型' },表示消息已接收处理。
流控指令:服务端根据流状态判断需要自动发送时,发送 { type: 'flow_control', action: 'auto_send', reason: '普通模式完成' }。客户端收到后触发对应的自动发送逻辑。
4.4 消息监听与处理
前端通过 RealtimeClient.onMessage(handler) 方法注册消息监听器,监听器函数接收一个消息对象参数,可以根据消息的 type 字段进行类型判断和处理。建议在各平台模块的初始化阶段注册监听器,确保能够及时处理各类消息。
监听器应该快速处理消息并返回,避免长时间阻塞导致消息堆积。对于需要异步处理的操作,可以使用 setTimeout 或 Promise 机制延后处理。
4.5 心跳机制(预览版)
状态:部分实现,待完善
4.5.1 心跳架构概述
系统心跳机制采用双通道设计:
- heartbeat 通道:
ws://localhost:8027/heartbeat,用于纯心跳保活
- realtime 通道:
ws://localhost:8027/realtime,业务消息与心跳混合
| 当前实现状态: |
组件 |
状态 |
说明 |
| 服务端 heartbeat.py |
✅ 已完成 |
每30秒发送心跳包 |
| 客户端 sws-client.ts |
✅ 已完成 |
WebSocket 连接与重连 |
| 客户端 config/index.ts |
✅ 已完成 |
心跳配置定义 |
| 客户端 flowrole.ts |
⚠️ 部分完成 |
心跳定时器已定义,未启用 |
| FlowHeartbeat 数据结构 |
✅ 已完成 |
Python 端完整实现 |
| HeartbeatSender 发送器 |
✅ 已完成 |
异步队列发送机制 |
| 客户端心跳发送 |
❌ 未完成 |
客户端未主动发送心跳 |
| 心跳超时处理 |
❌ 未完成 |
缺少超时检测与响应 |
4.5.2 心跳配置
客户端心跳配置位于 client-in-JS-Dev/src/config/index.ts:
export const config = {
ws: {
url: 'ws://localhost:8027/heartbeat',
realtimeUrl: 'ws://localhost:8027/realtime'
},
client: {
heartbeatInterval: 5000, // 心跳间隔(毫秒)
reconnectDelay: 3000, // 重连延迟
connectionTimeout: 5000 // 连接超时
}
};
export const heartbeatConfig = {
roles: {
client: 'client', // 客户端
sse: 'sse', // SSE流
server: 'server', // 服务端
auto_send: 'auto_send' // 自动发送
},
steps: {
idle: 'idle', // 空闲
streaming: 'streaming', // 传输中
start: 'start', // 开始
done: 'done', // 完成
sending: 'sending', // 发送中
batch: 'batch' // 批次汇报
}
};
4.5.3 FlowHeartbeat 数据结构
服务端流心跳数据结构定义于 server/flowrole/__init__.py:
@dataclass
class FlowHeartbeat:
flow_id: str = '' # 流程ID
flow_role: str = 'sse' # 角色:sse/command/auto_send
step: str = 'idle' # 状态:start/streaming/done/sending/batch
cmd_count: int = 0 # 命令数量
result_count: int = 0 # 结果数量
pending: int = 0 # 待完成命令数
results: List[str] = [] # 结果列表
timestamp: str = '' # 时间戳
4.5.4 待完善功能
- 客户端主动心跳:客户端应定期向 heartbeat 通道发送心跳包
- 心跳超时检测:服务端应检测客户端心跳,超时则断开连接
- 流状态心跳同步:SSE 流状态变化时同步 FlowHeartbeat 到服务端
- 心跳响应确认:客户端收到心跳后应回复确认消息
- 断线自动重连:完善重连逻辑,避免频繁重连
4.5.5 心跳消息格式
服务端发送:
{
"type": "heartbeat",
"time": "2025-02-01T10:00:00.000000"
}
客户端发送(待实现):
{
"type": "heartbeat",
"flow_id": "sse_abc123",
"flow_role": "client",
"step": "idle",
"cmd_count": 0,
"result_count": 0,
"timestamp": 1738406400000
}
五、命令解析系统
5.1 命令格式定义
系统通过 command_parser/patterns.json 定义命令识别格式,当前支持的格式为静默执行模式:?<命令内容>!。其中 ?< 是起始标记,>! 是结束标记,命令内容位于两者之间。
{
"1": {
"name": "静默执行模式",
"check_count": 2,
"check_chars": "?<",
"entry_point": "noshow",
"end_chars": ">!"
}
}
check_count 指定触发检测所需的最小字符数,check_chars 是检测字符序列,entry_point 指定使用的处理器名称,end_chars 指定结束标记。
5.2 解析器工作流程
parser.py 中的 StreamParser 类实现流式累积判断器模式。当收到新的数据块时,首先将数据追加到缓冲区,然后调用 detect_pattern() 检测缓冲区开头是否匹配预定义的模式。如果匹配成功,则加载对应的处理器模块并将后续数据转发给该处理器处理。
处理器在识别到完整的命令格式后(以 >! 结尾),提取命令内容并执行,执行结果通过返回值传递回解析器。这种设计支持命令分片到达的场景,例如 SSE 流式传输中命令可能被拆分到多个数据包。
5.3 处理器实现
处理器位于 handlers/ 目录下,每个处理器是一个独立的 Python 文件,包含一个与处理器同名的类。例如 noshow.py 实现 Noshow 类,继承自流式处理器的基类,负责执行 shell 命令。
处理器类的核心方法是 feed(chunk),接收数据块并返回处理结果。当检测到命令结束时,返回包含执行结果的字典 { success: true, output: '命令输出' };否则返回 None 表示继续累积。
class Noshow:
def feed(self, chunk: str) -> dict:
self.buffer += chunk
if '>!' in self.buffer:
idx = self.buffer.index('>!')
cmd = self.buffer[2:idx].strip()
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
return {'success': True, 'output': result.stdout + result.stderr}
return None
5.4 命令使用示例
在任意支持的 AI 平台对话中,输入以下格式的命令即可执行系统命令:
?<echo Hello World>!
系统会捕获这条消息,提取 echo Hello World 部分作为命令执行,然后将输出结果 Hello World 返回给对话。如果命令执行成功,输出内容会通过 WebSocket 发送到前端,前端模块(如千问)会自动将结果填入输入框并发送。
六、平台模块详解
6.1 DeepSeek 模块
DeepSeek 模块位于 modules/deepseek/ 目录下,采用 XHR 拦截方式捕获 SSE 流。模块入口文件 index.ts 定义 DeepSeekModule 类,在初始化时加载 SSE 适配器和自动发送拦截器。
SSE 适配器 sse-adapter.ts 拦截 /api/v0/chat/completion 端点,使用状态机模式处理 SSE 数据。适配器解析两种 JSON 格式:增量追加格式(p 为路径,o 为操作类型,v 为值)和批量更新格式。提取的文本内容通过 RealtimeClient.send({ type: 'deepseek_data', ... }) 发送到后端。
自动发送模块 auto-send.ts 实现占位符替换机制:先在输入框填入特殊占位符 <<<MCP_AUTO_SEND>>>,拦截请求体将其替换为真实内容,最后发送给服务端。这种方式避免了直接操作 DOM 可能触发的前端验证逻辑。
模块还监听 realTimeCommandsComplete 事件,当服务端返回命令执行结果后,自动将结果发送到对话输入框并触发发送。
6.2 豆包模块
豆包模块位于 modules/doubao/ 目录下,采用 Fetch 拦截方式。模块入口 index.ts 定义 DouBaoModule 类,初始化时加载 SSE 适配器。
SSE 适配器 sse-adapter.ts 拦截 /chat/completion 端点,处理四种 SSE 事件类型:FULL_MSG_NOTIFY 标识消息开始(记录 messageId)、CHUNK_DELTA 和 STREAM_CHUNK 包含文本增量、STREAM_MSG_NOTIFY 包含完整文本块。适配器通过 doubaoExtractText() 函数从复杂的 JSON 结构中提取文本内容。
豆包的 SSE 格式使用 event: 和 data: 分行格式,事件类型标识不同类型的数据包。适配器解析每行数据,提取事件类型和事件数据,分别调用 flowRoleReporter.feed() 上报状态。
6.3 腾讯元宝模块
元宝模块位于 modules/yuanbao/ 目录下,采用 XHR 拦截方式。模块入口 index.ts 定义 startYuanbaoModule() 函数初始化模块。
SSE 适配器 sse-adapter.ts 拦截 /api/chat 端点,处理三种 SSE 事件:meta 包含 messageId、text 包含文本内容、done 标识流结束。适配器使用 yuanbaoExtractText() 函数从 type: 'text' 的数据包中提取 msg 字段内容。
适配器维护 fullContent 累积完整内容,通过 RealtimeClient.send({ type: 'yuanbao_data', ... }) 发送增量数据。流结束时发送最终状态。
6.4 通义千问模块
千问模块位于 modules/qianwen/ 目录下,采用 Fetch 拦截方式。模块入口 index.ts 定义 startQianwenModule() 函数初始化模块,并设置命令结果监听器。
SSE 适配器 sse-adapter.ts 拦截 /api/v2/chat 端点,使用增量提取机制避免重复发送。适配器维护 lastContent 和 lastResid 状态变量,通过比较当前响应与上一次响应的内容差异,只提取增量部分发送到后端。
千问的 SSE 格式使用 event: message 和 data: 伴随 JSON 数据。JSON 结构中 communication.sessionid 标识会话,communication.resid 标识响应序号,data.messages[1].content 是 AI 回复内容,data.messages[1].status 标识完成状态。
模块还实现 qianwenAutoSend() 函数,提供 Promise 接口的自动发送功能。发送时先填充输入框,触发发送按钮点击,然后等待流结束并返回完整回复内容。
命令结果监听器监听 WebSocket 的 cmd_result 消息,当收到命令执行结果时,自动调用 qianwenAutoSend() 将结果发送到对话输入框,实现命令结果的自动回显。
七、事件系统
7.1 前端自定义事件
系统定义以下前端自定义事件,供模块间通信使用:
realTimeCommandsComplete:由 DeepSeek 模块触发,当服务端返回命令执行结果且所有结果收集完成时触发。监听器可以获取累积的命令结果并执行自动发送。
flow-auto-send:由流控上报模块触发,当服务端下发 action: 'auto_send' 的流控指令时触发。事件.detail 包含触发自动发送的站点名称 site。
commandResult:历史遗留事件,曾用于跨模块传递命令结果,现已统一使用 WebSocket 消息机制,新代码不应再使用此事件。
7.2 WebSocket 消息事件
WebSocket 消息是模块间通信的主要方式,所有模块通过 RealtimeClient.onMessage() 注册监听器。重要的消息类型包括:
cmd_result:服务端返回命令执行结果。监听器可以通过 msg.result.success 判断是否执行成功,通过 msg.result.output 获取命令输出。千问模块监听此消息实现命令结果的自动发送。
flow_control:服务端下发的流控指令。通过 msg.action 判断操作类型:auto_send 表示需要自动发送、pause 表示暂停流处理、resume 表示恢复流处理、wait 表示等待。
script:服务端返回的脚本内容。客户端请求脚本时收到此消息,包含完整的 JavaScript 代码。
7.3 SSE 事件映射
各平台的 SSE 事件通过 flowrole.ts 中的 SITE_CONFIGS 配置映射到流控上报类型:
DeepSeek 平台:ready → flow_start、finish → flow_end、data → status_update、close → flow_close。状态解析函数从 JSON 的 p 字段提取 quasi_status 和 status,从 v.fragments 提取 fragment_count。
豆包平台:FULL_MSG_NOTIFY → flow_start、SSE_REPLY_END → flow_end、CHUNK_DELTA 和 STREAM_CHUNK → status_update。状态解析函数从 meta.message_id 提取 quasi_status,从 patch_op 数组长度推断 fragment_count。
元宝平台:meta → flow_start、text → status_update、done → flow_end。状态解析函数从 messageId 提取 quasi_status,从 stopReason 提取 status。
千问平台:message → status_update、complete → flow_end。状态解析函数从 communication.sessionid 提取 quasi_status,从 communication.resid 提取 fragment_count。
八、自动发送机制
8.1 占位符替换方式
DeepSeek 模块采用占位符替换方式实现自动发送。流程如下:首先在输入框填入特殊占位符 <<<MCP_AUTO_SEND>>>,同时将真实内容存储到全局变量;拦截请求发送函数,在请求体中发现占位符时,将对应字段的内容替换为真实内容;请求发送后,清空全局变量,完成替换。
这种方式的优势是避免了直接操作 DOM 可能触发的前端验证逻辑,劣势是需要针对每个平台的请求体格式编写对应的替换逻辑。DeepSeek 的替换逻辑支持 prompt、messages、query、input 等多种字段名,会在 messages 数组中查找包含占位符的消息并替换其 content。
8.2 DOM 操作方式
千问模块采用直接的 DOM 操作方式实现自动发送。流程如下:通过 findQianwenInput() 查询选择器定位输入框元素;使用 simulateReactInput() 模拟 React 的 value setter 并触发 input/change 事件;通过 findQianwenSendButton() 定位发送按钮,移除 disabled 属性后触发点击事件。
千问的自动发送还提供 Promise 接口 qianwenAutoSend(content),调用后会等待 SSE 流结束并返回完整的回复内容。实现方式是生成唯一的 resultId,存储到全局对象;SSE 适配器在收到增量数据时累积内容,流结束时标记 completed;qianwenAutoSend() 轮询全局对象的状态,解析 Promise。
8.3 自动发送触发场景
自动发送在以下场景触发:
命令结果返回:当服务端执行完命令并返回结果时,监听 cmd_result 消息的模块(如千问)自动将结果发送到对话。
流控指令下发:服务端根据流状态判断需要自动回复时,下发 action: 'auto_send' 的流控指令,前端收到后调用 autoSendWithReplace('', 0) 或 qianwenAutoSend() 触发发送。
监听器触发:DeepSeek 模块监听 realTimeCommandsComplete 事件,当事件触发时将累积的命令结果发送到对话。
九、流控系统
9.1 流控角色定义
服务端 flowrole/__init__.py 定义三种流控角色:
SSEFlowRole:负责 SSE 流的生命周期管理。记录 quasi_status(准状态)和 status(最终状态),判断是否为 continue 模式(存在未完成的片段)。提供 should_auto_send() 方法智能判断是否应该自动发送。
CommandFlowRole:负责命令解析的生命周期管理。记录命令数量和结果数量,当所有命令都有结果时判断是否可以发送。
AutoSendFlowRole:负责自动发送的生命周期管理。记录发送状态和结果累积。
9.2 自动发送判断逻辑
should_auto_send() 方法的判断逻辑如下:
首先检查流是否已结束(step == 'done'),未结束的流不能触发自动发送。然后检查是否有待处理命令(pending_count > 0),有待处理命令时不能触发自动发送。
对于普通模式(is_continue_mode == false):quasi_status == 'FINISHED' 或 status == 'FINISHED' 时返回 true,表示流正常结束可以发送。
对于 continue 模式(is_continue_mode == true):quasi_status == 'FINISHED' 时返回 true(continue 完成可以发送),quasi_status == 'INCOMPLETE' 时返回 false(continue 未完成不能发送)。
9.3 流状态字段
流状态通过以下字段描述:
quasi_status:准状态,反映流的中间状态。对于 DeepSeek 是 p 字段包含 quasi_status 的值;对于豆包是 meta.message_id;对于元宝是 messageId;对于千问是 communication.sessionid。
status:最终状态,反映流的结束原因。对于 DeepSeek 是 p 字段包含 status 的值;对于元宝是 stopReason;对于千问是 data.status。
fragment_count:片段数量,反映流的分块情况。对于 DeepSeek 是 response.fragments 数组长度;对于豆包是 patch_op 数组长度;对于千问是 communication.resid。
initial_content_length:初始内容长度,反映流开始时的内容大小。用于判断是否为 continue 模式(初始内容不为空表示继续之前的对话)。
十、使用指南
10.1 快速开始
确保服务端已启动(python main.py),浏览器访问对应的 AI 平台(deepseek.com、doubao.com、yuanbao.tencent.com、qianwen.com),注入脚本会自动加载并初始化对应模块。打开浏览器控制台,看到 [MCP] 检测到 xxx 站点 的日志表示模块加载成功。
10.2 命令使用
在任意支持的 AI 平台对话中输入命令格式即可执行系统命令:
?<echo 当前时间: && date>!
命令执行结果会自动发送到对话(千问平台)或需要手动发送(DeepSeek 等平台)。命令支持任意 Shell 命令,包括管道、重定向等高级用法。
10.3 角色卡片
角色卡片配置位于 role-cards/ 目录,JSON 格式定义角色的名称、描述、系统提示词等。例如:
{
"name": "测试角色",
"description": "用于测试命令解析器的角色",
"system_prompt": "你是一个测试助手,请简单回复测试成功",
"test_command": "?<echo 测试>!"
}
10.4 调试命令
前端暴露以下全局调试函数:
window.mcpAutoSend(cmd):发送命令到 DeepSeek
window.mcpAutoSendWithReplace(text, delay):使用占位符方式发送文本
window.mcpTest(cmd):测试命令发送
window.mcpSendCmd(cmd):直接发送命令数据到服务端(不经过 SSE)
window.mcpFindButton():查找发送按钮元素
window.mcpSimulateInput(element, text):模拟输入
window.mcpForceEnable(btn):强制启用按钮
十一、扩展开发
11.1 新增平台支持
新增平台支持需要以下步骤:
-
在 modules/ 下创建新目录,包含 index.ts、sse-adapter.ts 和可选的 auto-send.ts。
-
在 sse-adapter.ts 中实现 SSE 适配器,继承通用的适配器模式。关键方法包括:isChatEndpoint(url) 判断目标 URL、extractText(data) 提取文本增量、installXHRInterceptor() 或 installFetchInterceptor() 安装拦截器。
-
在 flowrole.ts 的 SITE_CONFIGS 中添加平台配置,定义 SSE 事件到流控上报类型的映射关系,以及状态解析函数。
-
在 entry.ts 的 checkAndLoad() 函数中添加域名检测逻辑,匹配新平台的域名并加载模块。
11.2 新增命令处理器
新增命令处理器需要以下步骤:
-
在 server/command_parser/handlers/ 下创建 Python 文件,实现与处理器同名的类。类应实现 feed(chunk) 方法,在检测到完整命令格式时返回执行结果。
-
在 patterns.json 中添加处理器配置,指定 check_chars(触发检测的字符序列)、entry_point(处理器名称)、end_chars(结束标记)。
-
处理器类名应为处理器名称的首字母大写形式(例如 noshow 对应 Noshow 类),以便于自动加载。
11.3 新增消息类型
新增 WebSocket 消息类型需要前后端同步修改:
前端在 sws-client.ts 中发送新类型的消息,调用 RealtimeClient.send({ type: '新类型', ... })。
服务端在 handlers/realtime.py 的 handle() 函数中添加新类型的处理逻辑。
如果需要在其他模块中监听新消息,在对应模块的初始化代码中调用 RealtimeClient.onMessage() 注册监听器。
十二、常见问题
12.1 SSE 拦截不生效
检查浏览器控制台是否有 [MCP] 检测到 xxx 站点 的日志。如果没有,检查 entry.ts 中的域名匹配正则是否正确。确保脚本已正确注入,刷新页面重新加载。
如果日志正常但 SSE 数据未捕获,检查 SSE 适配器中的 URL 匹配条件是否正确,以及请求是否真的使用了 SSE 流式传输(部分接口可能返回普通 JSON)。
12.2 命令未识别
确保命令格式正确:?<命令内容>!,注意 ?< 和 >! 是英文标点且没有空格。检查 patterns.json 中的配置是否与命令格式匹配。
如果命令被拆分到多个数据包,确认流式解析器的缓冲区累积逻辑正确,命令结束标记 >! 应该能够正确识别。
12.3 自动发送失败
检查输入框元素选择器是否正确,不同平台或页面更新后选择器可能失效。检查按钮是否需要特定的点击事件序列(如 mousedown、mouseup、click)。
对于占位符替换方式,检查拦截器是否正确注册,请求体中是否真的包含占位符。可以在拦截器中添加日志确认。
12.4 WebSocket 连接失败
检查服务端是否正在运行,端口 8027 是否被占用。检查浏览器的网络面板,确认 WebSocket 连接尝试是否有错误。
如果是跨域问题,确认服务端的 WebSocket 处理器正确处理了 Origin 头。
十三、配置参考
13.1 服务端配置
服务端口在 main.py 中配置:
WS_PORT = 8027 # WebSocket 端口
HTTP_PORT = 8028 # HTTP 静态文件端口
CLIENT_PATH = r"D:\AIcard\client-in-JS-Dev\dist" # 前端构建输出目录
13.2 客户端配置
客户端配置在 config/index.ts 中:
export const config = {
ws: {
url: 'ws://localhost:8027',
realtimeUrl: 'ws://localhost:8027/realtime',
heartbeatUrl: 'ws://localhost:8027/heartbeat'
},
client: {
reconnectDelay: 3000, # WebSocket 重连延迟(毫秒)
}
}
13.3 SSE 事件配置
SSE 事件配置在 flowrole.ts 的 SITE_CONFIGS 数组中,每个平台的配置包含:
name:平台名称标识
sitePattern:域名匹配正则
events:SSE 事件到上报类型的映射数组
parseStatus:状态解析函数
onAutoSend:自动发送回调函数
十四、项目现状与未来规划
14.1 当前已实现功能
平台支持(4个):
- DeepSeek:XHR 拦截 + 占位符替换自动发送
- 豆包:Fetch 拦截 + 完整事件链自动发送 + 流结束等待
- 腾讯元宝:XHR 拦截 + Quill 编辑器自动发送
- 通义千问:Fetch 拦截 + 增量提取 + Promise 自动发送
命令系统:
- 协议格式:
?<命令>!(静默执行模式)
- 处理器:noshow(Shell 命令执行)
- 流式累积判断器模式,支持命令分片到达
流控系统:
- SSE 流生命周期跟踪
- 智能自动发送判断(普通模式 / Continue 模式)
- WebSocket 实时通信
事件机制:
- WebSocket 消息类型:cmd_result、flow_control、ack 等
- 前端自定义事件:realTimeCommandsComplete、flow-auto-send
14.2 待实现功能
流程管理(Flow Management):
- 目标:实现多命令串行/并行执行流程
- 功能点:流程定义、流程暂停/继续、流程分支、流程条件判断
- 状态:设计中
Agent 系统:
- 目标:支持 AI Agent 自主决策和执行
- 功能点:Agent 任务分解、工具调用、结果反思、上下文记忆
- 状态:未开始
AI 经理自动流程化:
- 目标:AI 自动编排复杂业务流程
- 功能点:自然语言流程描述、自动生成执行计划、异常处理与重试
- 状态:未开始
MCP 拓展(Model Context Protocol):
- 目标:支持标准 MCP 协议的外接工具
- 功能点:MCP Server 连接管理、工具发现与调用、资源访问
- 状态:未开始
14.3 拓展规划详述
14.3.1 流程管理设计草案
流程管理旨在解决多步骤任务的编排问题,将一系列命令或操作组织成可管理的流程。
流程定义:
# 伪代码示例
flow = Flow(
name="数据处理流程",
steps=[
Step("加载数据", cmd="?<python load_data.py>!"),
Step("数据分析", cmd="?<python analyze.py>!", depends_on="加载数据"),
Step("生成报告", cmd="?<python report.py>!", depends_on="数据分析"),
],
parallel=["步骤A", "步骤B"], # 可并行执行的步骤
condition="if result['success']" # 条件判断
)
状态管理:
- pending:等待执行
- running:执行中
- paused:暂停
- completed:完成
- failed:失败
- skipped:跳过
控制命令:
?<flow start 数据处理流程>! - 启动流程
?<flow pause>! - 暂停当前流程
?<flow resume>! - 恢复流程
?<flow stop>! - 终止流程
?<flow status>! - 查询流程状态
14.3.2 Agent 系统设计草案
Agent 系统赋予 AI 自主任务执行能力,支持工具选择、任务分解和结果反思。
Agent 结构:
class Agent:
def __init__(self, name: str, tools: List[Tool]):
self.name = name
self.tools = tools
self.memory = [] # 短期记忆
self.context = {} # 执行上下文
async def think(self, task: str) -> Plan:
"""任务分解与规划"""
pass
async def execute(self, plan: Plan) -> Result:
"""执行计划"""
pass
async def reflect(self, result: Result) -> Reflection:
"""结果反思与改进"""
pass
工具调用协议:
interface Tool {
name: string;
description: string;
parameters: Schema;
execute: (args: any) => Promise<Result>;
}
interface Plan {
steps: Step[];
reasoning: string; // 决策推理过程
confidence: number; // 置信度
}
Agent 命令:
?<agent create 我的助手 with tools=[文件读写,命令执行,网络搜索]>! - 创建 Agent
?<agent task 我的助手 分析这份日志>! - 分配任务
?<agent status>! - 查看 Agent 状态
?<agent memory>! - 查看 Agent 记忆
14.3.3 AI 经理自动流程化设计草案
AI 经理是一个基于命令反应式(Reactive)的流程编排系统,核心理念是命令驱动 + 事件响应 + 并行分发。与传统的静默执行不同,AI 经理强调命令的执行结果可以反应式地流向其他窗口端,实现多平台协同工作。
核心概念:
| 概念 |
说明 |
| 流程节点(Node) |
流程中的执行单元,可以是 Shell 命令、脚本或外部工具 |
| 数据流(Flow) |
节点之间的数据传递,基于事件驱动 |
| 反应式触发 |
命令执行结果自动触发下游节点,无需手动调用 |
| 并行分发 |
单个命令的输出可以同时发送到多个目标窗口 |
| 上下文传递 |
上下一个窗口的上下文自动继承上一个窗口的状态 |
架构设计:
┌─────────────────────────────────────────────────────────────────────────────┐
│ AI Manager 分布式调度架构 │
│ 核心愿景:通过单一窗口实现无限并行场景 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ AI Manager (主调度中心) │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │ │
│ │ │ 任务分解器 │ │ 策略规划器 │ │ 流程编排引擎 │ │ │
│ │ │ Task Parser │ │ Strategy │ │ Flow Orchestrator │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────┼─────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 角色定义层 (Role Layer) │ │
│ │ │ │
│ │ 根据客户端ID定义角色: │ │
│ │ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │ │
│ │ │ client_id=001 │ │ client_id=002 │ │ client_id=003 │ │ │
│ │ │ 角色: 总指挥 │ │ 角色: 包工头 │ │ 角色: 包工头 │ │ │
│ │ │ Role: Master │ │ Role: Foreman │ │ Role: Foreman │ │ │
│ │ └───────┬───────┘ └───────┬───────┘ └───────┬───────┘ │ │
│ │ │ │ │ │ │
│ └───────────┼──────────────────┼──────────────────┼────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 最小信息卡分发层 (Info Card Layer) │ │
│ │ │ │
│ │ 最小信息卡格式: │ │
│ │ ┌─────────────────────────────────────────────────────────────┐ │ │
│ │ │ { │ │ │
│ │ │ "task_id": "uuid", // 任务唯一标识 │ │ │
│ │ │ "parent_id": "uuid", // 父任务ID(可省略) │ │ │
│ │ │ "role": "analyzer", // 目标角色 │ │ │
│ │ │ "priority": 1, // 优先级 1-5 │ │ │
│ │ │ "payload": {...}, // 任务载荷 │ │ │
│ │ │ "callback": "url", // 结果回调地址 │ │ │
│ │ │ "timeout": 30000, // 超时时间(ms) │ │ │
│ │ │ "retry": 3 // 重试次数 │ │ │
│ │ │ } │ │ │
│ │ └─────────────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌────────────────────┼────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 包工头AI层 (Foreman AI Layer) │ │
│ │ 每个包工头负责管理一组Worker,并行分发子任务 │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────────────┐ │ │
│ │ │ Foreman-001 Foreman-002 │ │ │
│ │ │ ┌──────────────────┐ ┌──────────────────┐ │ │ │
│ │ │ │ Worker-A (DeepSeek)│ │ Worker-C (千问) │ │ │ │
│ │ │ │ Worker-B (豆包) │ │ Worker-D (元宝) │ │ │ │
│ │ │ │ Worker-C (Claude) │ │ Worker-E (Gemini) │ │ │ │
│ │ │ └──────────────────┘ └──────────────────┘ │ │ │
│ │ └─────────────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌────────────────────┼────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 执行结果聚合层 (Result Aggregation) │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │DeepSeek │ │ 豆包 │ │ 千问 │ │ Claude │ │ Gemini │ │ │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │
│ │ │ │ │ │ │ │ │
│ │ └─────────────┴─────────────┴─────────────┴─────────────┘ │ │
│ │ │ │ │
│ │ ┌──────────┴──────────┐ │ │
│ │ │ 结果聚合引擎 │ │ │
│ │ │ Result Aggregator │ │ │
│ │ └─────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
用户自定义流程示例:
用户可以通过配置文件自由定义任意复杂度的并行流程:
# user-flow.yaml - 用户自定义流程
name: 跨平台深度分析流程
description: 通过单一窗口发起,同时在5个AI平台并行分析,结果自动聚合
# 流程定义
flow:
# 阶段1: 主调度发起任务(AI Manager)
- stage: master_dispatch
role: master
command: |
?<parallel to=[analyzer_001, analyzer_002, analyzer_003]
对以下技术文档进行深度分析:
{document}
分析维度:技术细节、架构设计、性能优化、安全性
>!
# 阶段2: 包工头分发子任务(Foreman AI)
- stage: foreman_split
role: foreman
rules: |
# 每个包工头将任务拆分为4个子任务
# 子任务1: 技术细节分析
# 子任务2: 架构设计评估
# 子任务3: 性能优化建议
# 子任务4: 安全性审查
# 阶段3: Worker并行执行
- stage: parallel_execution
concurrent: true
workers: [deepseek, doubao, qianwen, claude, gemini]
# 阶段4: 结果聚合
- stage: result_aggregation
role: aggregator
strategy: weighted_average
weights:
deepseek: 0.25
doubao: 0.25
qianwen: 0.2
claude: 0.15
gemini: 0.15
# 阶段5: 生成最终报告
- stage: final_report
role: master
command: |
?<to yuanbao: 综合分析结果 $aggregated_result>!
并行场景示例:
场景1: 单一窗口发起100个DeepSeek并行分析
?<manager create 100并行分析 from "
parallel to=[100个Worker节点]
每个节点执行: ?<deepseek 分析 $task_data>!
">!
场景2: 多层分包架构(主调度 -> 包工头 -> 工人)
?<manager create 三层架构 from "
master_dispatch: 分解任务到5个包工头
-> foreman_001: 分发到10个DeepSeek Worker
-> foreman_002: 分发到10个豆包 Worker
-> foreman_003: 分发到10个千问 Worker
-> aggregator: 收集所有结果
">!
场景3: 条件分支并行
?<manager create 智能分流 from "
check_task_type($data)
-> if 技术类 then parallel to=[deepseek, claude]
-> if 创意类 then parallel to=[doubao, midjourney]
-> if 分析类 then parallel to=[qianwen, gemini]
-> unified_result: 汇总输出
">!
核心优势:
| 特性 |
说明 |
| 单窗口无限并行 |
1个窗口可发起任意数量的并行任务 |
| 角色自由定义 |
根据ClientID灵活配置角色(总指挥/包工头/工人) |
| 最小信息卡 |
仅传递必要信息,减少网络开销 |
| 分层调度 |
主调度 -> 包工头 -> 工人,三级架构清晰可控 |
| 用户自定义 |
流程完全由用户配置文件定义 |
| 动态扩缩容 |
Worker数量可根据负载动态调整 |
命令模式详解:
AI 经理使用 ?<>! 语法扩展,支持更复杂的流程控制:
# 基础 Shell 命令(立即执行)
?<echo hello>!
# 变量定义与传递
?<set result = $(python analyze.py data.csv)>!
# 并行分发到多个窗口
?<parallel to=[deepseek,qianwen] echo "并行消息">!
# 跨窗口数据传递
?<from doubao: set data = $(python extract.py)>!
?<to yuanbao: echo $data | python process.py>!
命令实现状态:
| 命令模式 |
状态 |
说明 |
?<shell>! |
✅ 已实现 |
基础Shell命令执行 |
?<$var = $(shell)>! |
✅ 已实现 |
变量定义与命令结果捕获 |
?<parallel to=[a,b] cmd>! |
✅ 已实现 |
并行分发到多个目标 |
?<from platform: cmd>! |
✅ 已实现 |
从指定平台获取数据 |
?<to platform: cmd>! |
✅ 已实现 |
发送到指定平台 |
?<if cond then cmd>! |
🔶 预览 |
条件执行(待流程系统接入) |
?<for i in {1..n}>! |
🔶 预览 |
循环执行(待流程系统接入) |
?<cmd1>! → ?<cmd2>! |
🔶 预览 |
链式调用(待流程系统接入) |
?<manager create flow>! |
🔶 预览 |
流程管理(待流程系统接入) |
预览版命令示例(待流程系统完善后启用):
# 条件执行(待实现)
?<if $(echo $result | grep ERROR) then echo "有错误">!
# 循环执行(待实现)
?<for i in {1..5} do echo "第 $i 次">!
# 链式反应(待实现)
?<python step1.py>! → ?<python step2.py $RESULT>! → ?<python step3.py $RESULT>!
# 流程管理(待实现)
?<manager create 数据分析 from "python load.py -> python clean.py -> python analyze.py">!
流程定义语法(待实现):
流程可以通过命令序列定义,也可以通过 JSON/YAML 配置文件定义:
# flow.yaml - 数据分析流程示例
name: 数据分析流程
trigger: 手动触发
nodes:
- id: load_data
name: 加载数据
command: ?<python load_data.py --input data.csv>!
output_var: raw_data
- id: clean_data
name: 数据清洗
command: ?<python clean.py $raw_data>!
input_var: raw_data
output_var: clean_data
depends_on: load_data
- id: analyze
name: 数据分析
command: ?<python analyze.py $clean_data>!
input_var: clean_data
output_var: analysis_result
depends_on: clean_data
parallel_outputs: # 并行发送到多个平台
- deepseek: "请分析以下结果:$analysis_result"
- qianwen: "总结:$analysis_result"
- doubao: "简要说明:$analysis_result"
- id: report
name: 生成报告
command: ?<python report.py $analysis_result --output report.html>!
input_var: analysis_result
depends_on: analyze
- id: notify
name: 通知
command: ?<send_to_all 分析完成,报告已生成>!
depends_on: report
error_handling:
max_retries: 3
retry_delay: 1000
fallback: "发送告警消息"
反应式数据流:
数据在节点之间流动时采用发布-订阅模式:
# 发布数据
publish(event="node_result", data={"node": "analyze", "result": "..."})
# 订阅数据
subscribe(event="node_result", callback=on_result_received)
# 反应式链
react(
trigger="node_result",
condition=lambda d: d["node"] == "analyze",
action="notify"
)
跨窗口通信协议:
窗口之间的数据传递使用统一的协议格式:
{
"type": "flow_data",
"flow_id": "流程唯一ID",
"from_window": "doubao",
"to_window": ["deepseek", "qianwen"],
"data": {
"variable": "result",
"value": "分析结果内容",
"format": "text"
},
"metadata": {
"timestamp": 1234567890,
"node_id": "analyze",
"priority": "normal"
}
}
分布式执行模式:
AI 经理支持三种执行模式:
| 模式 |
说明 |
适用场景 |
| 本地模式 |
所有命令在本地 Shell 执行 |
简单脚本、文件操作 |
| 远程模式 |
命令分发到远程 Agent 执行 |
需要特定环境的任务 |
| 混合模式 |
部分本地、部分远程 |
复杂流程 |
Manager 命令集:
| 命令 |
功能 |
?<manager create 流程名 from 命令序列>! |
从命令序列创建流程 |
?<manager create 流程名 from file:flow.yaml>! |
从文件加载流程 |
?<manager run 流程名>! |
运行流程 |
?<manager run 流程名 --from-node step3>! |
从指定节点开始运行 |
?<manager pause 流程名>! |
暂停流程 |
?<manager resume 流程名>! |
恢复流程 |
?<manager stop 流程名>! |
终止流程 |
?<manager status 流程名>! |
查看流程状态 |
?<manager show 流程名>! |
显示流程结构图 |
?<manager export 流程名 to file:flow.json>! |
导出流程配置 |
?<manager list>! |
列出所有流程 |
?<manager delete 流程名>! |
删除流程 |
实际使用示例:
# 示例1:简单的本地数据分析流程
?<manager create 数据分析 from "python load.py -> python clean.py -> python analyze.py">!
# 示例2:带条件判断的流程
?<set threshold = 5>!
?<manager create 监控流程 from "
python check.py
-> if \$result > \$threshold then send_alert.py
-> log_result.py
">!
# 示例3:跨窗口并行分析
?<manager create 跨平台分析 from "
to:deepseek 分析这部分数据
to:qianwen 总结这份报告
to:doubao 生成图表
">!
# 示例4:反应式数据传递
?<from doubao: python extract.py | set raw_data>!
?<to yuanbao: python process.py $raw_data>!
?<to deepseek: python analyze.py $raw_data>!
# 示例5:监控并自动响应
?<manager create 自动运维 from "
check_system.py
-> if \$exit_code != 0 then alert.py
-> log_status.py
-> restart_service.py
" auto_retry=3>!
事件与回调:
# 事件类型
EVENTS = {
"flow_start": "流程开始",
"flow_pause": "流程暂停",
"flow_resume": "流程恢复",
"flow_stop": "流程终止",
"flow_complete": "流程完成",
"node_start": "节点开始",
"node_complete": "节点完成",
"node_error": "节点错误",
"data_published": "数据发布",
"data_received": "数据接收",
}
# 回调示例
def on_flow_complete(flow_id, results):
send_notification(f"流程 {flow_id} 完成,结果: {results}")
def on_node_error(node_id, error):
log_error(f"节点 {node_id} 错误: {error}")
retry_node(node_id)
状态追踪与持久化:
# 流程状态
class FlowState:
PENDING = "pending" # 等待执行
RUNNING = "running" # 执行中
PAUSED = "paused" # 暂停
COMPLETED = "completed" # 完成
FAILED = "failed" # 失败
CANCELLED = "cancelled" # 取消
# 持久化存储
storage = {
"flows": {}, # 流程定义
"executions": {}, # 执行记录
"variables": {}, # 全局变量
"checkpoints": {} # 检查点(用于恢复)
}
14.3.4 MCP 拓展设计草案
MCP(Model Context Protocol)是 Anthropic 提出的标准协议,用于 AI 模型与外部工具的连接。
MCP 架构:
┌─────────────┐ MCP ┌─────────────┐
│ AIcard │◄────────────►│ MCP Server │
│ Client │ JSON-RPC │ (工具提供者) │
└─────────────┘ └─────────────┘
MCP Server 配置:
{
"mcpServers": {
"filesystem": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/data"],
"env": {}
},
"postgres": {
"command": "uvx",
"args": ["mcp-server-postgres", "postgresql://user:pass@localhost/db"]
}
}
}
MCP 工具调用:
# 服务端 MCP 处理器
class MCPHandler:
async def handle_request(self, method: str, params: dict) -> any:
if method == "tools/list":
return self.list_tools()
elif method == "tools/call":
return self.call_tool(params["name"], params["arguments"])
MCP 集成命令:
?<mcp list>! - 列出可用 MCP 工具
?<mcp use filesystem read_file path=/data/test.txt>! - 使用 MCP 工具
?<mcp config add server_name command args...>! - 配置 MCP Server
14.4 技术债务与优化方向
当前技术债务:
- 重复代码:各平台 SSE 适配器有相似逻辑,可抽象公共基类
- 测试缺失:缺少单元测试和集成测试
- 错误处理:部分错误场景未完善处理
- 文档不全:API 文档和注释需要补充
优化方向:
- 插件架构:支持运行时加载/卸载平台模块
- 配置外部化:将平台配置和选择器移到外部配置文件
- 日志系统:统一的日志收集和分析
- 性能优化:减少不必要的内存分配和事件触发
14.5 参与贡献
欢迎参与项目开发!贡献方式包括:
报告问题:
- 在 GitHub Issues 中报告 Bug
- 提供复现步骤和环境信息
提交代码:
- Fork 项目并创建分支
- 编写清晰的 Commit Message
- 提交 Pull Request
改进文档:
功能建议:
- 在 Issues 中提出功能需求
- 参与设计讨论
- 贡献原型代码
十五、核心愿景与待完善大方向
15.1 AI Manager Agent 自动化系统
项目核心定位:AIcard 的终极目标是构建一个智能化的 AI Manager Agent 系统,能够在多个 AI 平台之间自动流转、智能决策、并行处理复杂任务。这不是一个简单的脚本注入工具,而是一个具备自主思考能力的 AI 自动化中枢。
AI Manager 的核心能力:
| 能力模块 |
描述 |
成熟度 |
| 任务分解 |
将复杂需求拆解为可执行的原子任务 |
待开发 |
| 平台调度 |
根据任务类型自动选择最优 AI 平台 |
已完成基础 |
| 上下文传递 |
跨窗口传递对话上下文和变量 |
已完成基础 |
| 智能决策 |
根据返回结果动态调整执行策略 |
待开发 |
| 错误恢复 |
自动检测失败并尝试替代方案 |
待开发 |
| 资源编排 |
串行与并行的智能混合调度 |
已完成基础 |
Agent 架构设计:
┌─────────────────────────────────────┐
│ AI Manager Agent │
│ ┌───────────────────────────────┐ │
│ │ 任务解析器 (Task Parser) │ │
│ └───────────────────────────────┘ │
│ ┌───────────────────────────────┐ │
│ │ 决策引擎 (Decision Engine) │ │
│ │ - 平台选择策略 │ │
│ │ - 执行路径规划 │ │
│ │ - 资源分配决策 │ │
│ └───────────────────────────────┘ │
│ ┌───────────────────────────────┐ │
│ │ 执行控制器 (Execution Ctrl) │ │
│ │ - 串行调度器 │ │
│ │ - 并行调度器 │ │
│ │ - 状态追踪器 │ │
│ └───────────────────────────────┘ │
└─────────────────────────────────────┘
│ │ │
┌────────────────┘ │ └────────────────┐
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ DeepSeek │ │ 豆包 │ │ 腾讯元宝 │
│ Agent │ │ Agent │ │ Agent │
└─────────────────┘ └─────────────────┘ └─────────────────┘
15.2 并行与串行混合架构
架构设计原则:
AIcard 的执行架构支持三种核心模式,能够根据任务特性智能选择最优执行策略:
执行模式矩阵:
┌─────────────────────────────────────────────────────────────────────────┐
│ │
│ 串行模式 (Sequential) 并行模式 (Parallel) │
│ ──────────────────── ────────────────── │
│ │
│ Step 1 ──► Step 2 ──► Step 3 ┌─────────┐ ┌─────────┐ │
│ │ Task A │ │ Task B │ │
│ 适用场景: └────┬────┘ └────┬────┘ │
│ - 有前后依赖 │ │ │
│ - 需要顺序执行 └─────┬─────┘ │
│ - 资源敏感操作 ▼ │
│ ┌─────────┐ │
│ │ Result │ │
│ │ Combine │ │
│ └─────────┘ │
│ │
│ 混合模式 (Hybrid) │
│ ───────────────── │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Phase 1: 并行执行独立任务 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ A1 │ │ A2 │ │ A3 │ │ │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ │ │
│ └───────┼────────────┼────────────┼──────────┘ │
│ └────────────┬────────────┘ │
│ ▼ │
│ Phase 2: 串行执行依赖任务 │
│ ┌─────────────────────────────────────────┐ │
│ │ B1 ──► B2 ──► B3 ──► B4 │ │
│ └─────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
并行分发协议:
# 基本并行分发
?<parallel to=[deepseek,qianwen,doubao] 分析这段数据>!
# 带条件并行
?<parallel to=[deepseek,qianwen]
deepseek: 深度分析技术细节
qianwen: 总结概述与结论
>!
# 跨平台数据聚合
?<from doubao: python extract.py>!
?<parallel to=[deepseek: analyze $data, qianwen: summarize $data]>!
?<to yuanbao: 整合分析结果 $combined_result>!
串行链式调用:
# 命令链
?<python extract.py -> python transform.py -> python load.py>!
# 带变量传递
?<set raw = $(python fetch.py)>!
?<python process.py $raw -> python analyze.py $raw -> python report.py $raw>!
# 条件分支
?<manager create 智能流程 from "
check_status.py
-> if \$exit_code == 0 then continue.py
-> else then rollback.py
-> finally cleanup.py
">!
15.3 命令协议性能优势
设计理念:AIcard 的命令协议采用极简设计,在保证功能完整性的前提下,追求最快的解析速度和最小的资源占用。
性能优势对比:
| 维度 |
AIcard 协议 |
传统方案 |
优势说明 |
| 协议体积 |
~10 字节开销 |
~100+ 字节 |
减少 90%+ |
| 解析复杂度 |
O(n) 单次扫描 |
O(n) 正则匹配 |
无需回溯 |
| 内存占用 |
固定缓冲区 |
动态字符串拼接 |
确定性内存 |
| 触发延迟 |
< 1ms |
10-50ms |
降低 90%+ |
| CPU 占用 |
极低(位运算) |
中等(正则引擎) |
降低 80%+ |
| 并发支持 |
无锁设计 |
需要锁机制 |
天然并发安全 |
协议格式设计:
最小化标记集:
?< 命令起始标记 (2字节)
>! 命令结束标记 (2字节)
-> 串行链接 (2字节)
| 管道传递 (1字节)
$ 变量引用 (1字节)
& 后台执行 (1字节)
流式累积判断器模式(已实现):
# 核心算法:单次扫描完成模式检测
def detect_pattern(buffer: str) -> tuple[bool, str, str]:
# 只需检查缓冲区开头 2 个字符
if buffer[:2] == '?<':
return (True, '静默执行', 'noshow')
return (False, None, None)
为什么最快:
- 无正则表达式:使用确定性有限自动机(DFA),避免正则引擎的回溯开销
- 固定标记长度:所有标记均为固定长度,无需长度推断
- 原地操作:缓冲区原地修改,避免字符串复制
- 零分配设计:心跳对象预分配,运行时无内存分配
- 事件驱动:基于回调的非阻塞设计,避免线程上下文切换
资源占用实测(预估):
| 操作 |
内存占用 |
CPU 占用 |
说明 |
| 空闲状态 |
~100KB |
0% |
仅维持 WebSocket 连接 |
| 单次命令解析 |
+10KB |
< 1ms |
临时缓冲区 |
| SSE 拦截 |
+50KB |
< 5% |
拦截器开销 |
| 并行分发 |
+20KB/node |
线性增长 |
每节点独立缓冲区 |
15.4 待完善功能清单
核心模块完善优先级:
| 优先级 |
功能 |
预计工作量 |
依赖项 |
| P0 |
心跳机制完善 |
2 天 |
无 |
| P0 |
错误恢复系统 |
3 天 |
心跳机制 |
| P1 |
AI Manager 任务分解 |
5 天 |
命令解析器 |
| P1 |
决策引擎 |
5 天 |
任务分解 |
| P1 |
跨平台变量传递 |
2 天 |
WebSocket 路由 |
| P2 |
图形化流程设计器 |
10 天 |
前端框架 |
| P2 |
持久化存储 |
3 天 |
数据库 |
| P3 |
MCP 协议完整实现 |
5 天 |
MCP Server |
| P3 |
分布式部署 |
7 天 |
消息队列 |
P0 优先级(当前冲刺):
## 心跳机制完善
- [ ] 客户端主动心跳发送
- [ ] 心跳超时检测与响应
- [ ] 流状态心跳同步
- [ ] 断线自动重连优化
## 错误恢复系统
- [ ] 命令执行超时检测
- [ ] 失败自动重试机制
- [ ] 备用平台 fallback
- [ ] 错误状态持久化
P1 优先级(下一阶段):
## AI Manager 基础
- [ ] 自然语言任务解析
- [ ] 任务依赖图构建
- [ ] 执行路径自动规划
## 决策引擎
- [ ] 平台性能评估模型
- [ ] 负载均衡策略
- [ ] 成本优化算法
15.5 扩展平台路线图
已支持平台:
| 平台 |
状态 |
命令协议 |
自动发送 |
SSE 拦截 |
| DeepSeek |
✅ 稳定 |
✅ |
✅ |
✅ |
| 豆包 |
✅ 稳定 |
✅ |
✅ |
✅ |
| 腾讯元宝 |
✅ 稳定 |
✅ |
✅ |
✅ |
| 通义千问 |
✅ 稳定 |
✅ |
✅ |
✅ |
计划支持平台:
| 平台 |
优先级 |
预估工作量 |
备注 |
| Claude (anthropic.com) |
P2 |
3 天 |
Web 类 SSE |
| ChatGPT (openai.com) |
P2 |
3 天 |
标准 SSE |
| Gemini (gemini.google.com) |
P3 |
5 天 |
双向流 |
| 文心一言 |
P3 |
5 天 |
百度系 |
| 讯飞星火 |
P3 |
5 天 |
科大讯飞 |
15.6 技术选型哲学
为什么选择当前技术栈:
| 组件 |
选择理由 |
替代方案 |
| TypeScript 前端 |
类型安全、IDE 友好 |
JavaScript |
| Python 后端 |
丰富的系统集成能力 |
Node.js / Go |
| WebSocket |
双向实时通信 |
HTTP Long Polling |
| 纯前端注入 |
零部署成本 |
浏览器扩展 |
| 固定缓冲区 |
确定性能、无 GC |
动态字符串 |
核心设计原则:
- 最小化依赖:每个模块独立,无隐式依赖
- 确定性行为:无随机性、可预测的性能
- 渐进式增强:基础功能优先,优雅降级
- 向后兼容:协议格式稳定,API 向后兼容
15.7 性能基准测试
目标性能指标:
| 指标 |
当前状态 |
目标值 |
测试方法 |
| 命令解析延迟 |
< 1ms |
< 0.5ms |
微基准测试 |
| 内存峰值 |
< 10MB |
< 5MB |
内存剖析 |
| 并发连接数 |
10 |
100 |
压力测试 |
| SSE 拦截延迟 |
< 5ms |
< 1ms |
端到端延迟 |
| 自动发送成功率 |
95% |
99% |
长期稳定性测试 |
手册版本:v0.1.0-preview
最后更新:2026年2月
维护者:MiniMax