用Python异步复刻Netty核心原理:从零实现简易版TCP通信框架
作为一名后端开发者,Netty 作为高性能的异步事件驱动网络框架,其核心设计思想(如 Channel、Pipeline、Handler 链式调用)一直是必学的知识点。为了检验自己对 Netty 核心原理的理解,我用 Python 异步(asyncio)实现了一个极简版的 Netty 核心框架——这只是纯学习用的 Demo,不考虑性能、异常处理完善性等生产级要求,仅用于拆解 Netty 核心逻辑,本文会完整分享这个 Demo 的实现思路和核心代码。
一、Netty 核心概念先梳理
在看代码前,先快速回顾 Netty 几个核心组件的作用,这是理解整个 Demo 的基础:
- Channel:代表一个网络连接,封装了读写数据的底层流(对应 Python 的 asyncio.StreamReader/StreamWriter);
- ChannelContext:上下文对象,持有 Channel 和 Pipeline 的引用,是 Handler 与 Channel/Pipeline 交互的桥梁;
- Pipeline:处理器流水线,管理所有 Handler,负责按顺序执行入站(Inbound)/出站(Outbound)处理器;
- Handler:处理器,分为入站(处理读事件)、出站(处理写事件)、通用处理器,是业务逻辑的载体;
- 入站/出站方向:入站(从网络端到应用层,如读取客户端数据)是正向遍历 Pipeline,出站(从应用层到网络端,如发送数据给客户端)是反向遍历 Pipeline。
二、项目结构解析
先看整个 Demo 的目录结构,清晰的结构是复刻 Netty 思想的第一步:
SimpleTcpTransmit/
├─ ImitateNetty/ # 核心框架层:复刻Netty核心组件
│ └─ src/
│ ├─ Context/ # 上下文相关:Channel、ChannelContext、Pipeline、初始化
│ │ ├─ Channel.py # 封装TCP连接的读写流
│ │ ├─ ChannelContext.py # 上下文对象:关联Channel和Pipeline
│ │ ├─ ChannelInit.py # 连接初始化:创建Channel/Pipeline/Context
│ │ └─ Pipeline.py # 处理器流水线:核心的Handler管理与执行
│ └─ HandleAbc/ # 处理器抽象层:定义各类Handler的接口
│ ├─ HandleAbc.py # 所有Handler的基类
│ ├─ InBountHandle.py # 入站处理器抽象(读事件)
│ ├─ OutBoundHandle.py # 出站处理器抽象(写事件)
│ ├─ GeneralHandle.py # 通用处理器(同时支持入站/出站)
│ └─ StartHandle.py # 入口处理器:启动Handler链执行
├─ service/ # 业务层:配置、业务Handler、服务启动
│ ├─ resource/
│ │ └─ application.toml # 配置文件:服务地址/端口
│ └─ src/
│ ├─ conf/
│ │ └─ ApplicationConf.py # 配置读取工具
│ ├─ Handle/
│ │ ├─ Inbount/
│ │ │ └─ SimpleDecode.py # 自定义入站处理器:解码字节为字符串
│ │ └─ outbound/
│ │ └─ SimpleEncoder.py # 自定义出站处理器:编码字符串为字节
│ └─ ServiceApplication.py # 服务端启动入口
三、核心框架层实现(ImitateNetty)
这一层是整个 Demo 的核心,完全复刻 Netty 核心组件的设计思想,我们按“抽象接口→核心实体→流水线执行”的顺序拆解。
3.1 处理器抽象层(HandleAbc)
首先定义所有处理器的抽象接口,约定入站/出站处理器的核心方法:
(1)所有Handler的基类:HandleAbc.py
from abc import ABC, abstractmethod
from ImitateNetty.src.Context.ChannelContext import ChannelContext
from typing import Any
class HandleAbc(ABC):
# 预留“传递到下一个处理器”方法(Demo中暂用递归实现)
async def nextHandle(self, contx: ChannelContext, msg: Any, handleName: str = None):
pass
(2)入站处理器抽象:InBountHandle.py
入站处理器负责处理“读事件”(如读取客户端发送的数据),核心方法是 read:
from abc import ABC, abstractmethod
from ImitateNetty.src.HandleAbc.HandleAbc import HandleAbc
from ImitateNetty.src.Context.ChannelContext import ChannelContext
from typing import Any
class InbountHandle(HandleAbc):
@abstractmethod
async def read(self, context: ChannelContext, msg: Any) -> Any:
"""读事件核心方法:处理入站消息,返回处理后的消息"""
pass
async def activation(self):
"""激活事件(Demo暂未实现)"""
pass
async def inactive(self):
"""不活跃事件(Demo暂未实现)"""
pass
(3)出站处理器抽象:OutBoundHandle.py
出站处理器负责处理“写事件”(如向客户端发送数据),核心方法是 write:
from abc import ABC, abstractmethod
from ImitateNetty.src.Context.ChannelContext import ChannelContext
from typing import Any
from ImitateNetty.src.HandleAbc.HandleAbc import HandleAbc
class OutBoundHandle(HandleAbc):
@abstractmethod
async def write(self, context: ChannelContext, msg: Any) -> Any:
"""写事件核心方法:处理出站消息,返回处理后的消息"""
pass
(4)通用处理器抽象:GeneralHandle.py
通用处理器同时实现入站和出站接口,适用于需要同时处理读写的场景:
from typing import Any
from abc import ABC, abstractmethod
from ImitateNetty.src.HandleAbc.InBountHandle import InbountHandle
from ImitateNetty.src.HandleAbc.OutBoundHandle import OutBoundHandle
class GeneralHandle(InbountHandle, OutBoundHandle, ABC):
@abstractmethod
async def fun(self, context, msg: Any):
"""通用处理方法(Demo中作为扩展入口)"""
pass
(5)入口处理器:StartHandle.py
StartHandle 是 Pipeline 的“起点”,负责启动入站/出站处理器链的执行,同时实现通用处理器接口:
from ImitateNetty.src.HandleAbc.InBountHandle import InbountHandle
from ImitateNetty.src.HandleAbc.OutBoundHandle import OutBoundHandle
from ImitateNetty.src.Context.ChannelContext import ChannelContext
from ImitateNetty.src.HandleAbc.GeneralHandle import GeneralHandle
from typing import Any
class StartHandle(GeneralHandle):
async def fun(self, context, msg: Any):
pass # 预留扩展方法
# 启动入站处理器链(正向遍历Pipeline)
async def inBount(self, context: ChannelContext, msg: Any):
await self.fun(context, msg)
handleList = context.pipeline.getHandleList()
# 寻找第一个入站处理器并执行
for handleIndex in range(len(handleList)):
handle = context.pipeline.getHandleByIndex(handleIndex)
if issubclass(handle.__class__, InbountHandle):
await context.pipeline.exceHandle(
handleName=handleList[handleIndex],
context=context,
msg=msg,
nextHandleClass=InbountHandle
)
break
# 启动出站处理器链(反向遍历Pipeline)
async def outBount(self, context: ChannelContext, msg: Any):
await self.fun(context, msg)
handleList = context.pipeline.getHandleList()
# 反向寻找第一个出站处理器并执行
for handleIndex in range(1, len(handleList)+1):
handleIndex = 0 - handleIndex # 反向索引
handle = context.pipeline.getHandleByIndex(handleIndex)
if issubclass(handle.__class__, OutBoundHandle):
await context.pipeline.exceHandle(
handleName=handleList[handleIndex],
context=context,
msg=msg,
nextHandleClass=OutBoundHandle
)
break
# 实现入站read方法(透传消息)
async def read(self, context: ChannelContext, msg: Any):
return msg
# 实现出站write方法(最终将消息写入网络流)
async def write(self, context: ChannelContext, msg: Any):
context.channel.write.write(msg)
await context.channel.write.drain()
3.2 上下文核心组件(Context)
这一层封装连接、上下文、流水线,是 Handler 交互的核心载体。
(1)Channel:封装TCP连接(Channel.py)
Channel 对应一个 TCP 连接,封装 asyncio 的读写流,提供消息发送入口(出站起点):
import asyncio
from typing import Any
from ImitateNetty.src.Context.ChannelContext import ChannelContext
from ImitateNetty.src.HandleAbc.StartHandle import StartHandle
class Channel():
def __init__(self, read: asyncio.StreamReader, write: asyncio.StreamWriter):
self.write: asyncio.StreamWriter = write # 写流(发送数据)
self.read: asyncio.StreamReader = read # 读流(接收数据)
# 发送消息:从StartHandle开始反向执行出站处理器链
async def sendMsg(self, context: ChannelContext, msg: Any):
await context.pipeline.getHandleByName(StartHandle).outBount(context, msg)
(2)ChannelContext:上下文对象(ChannelContext.py)
上下文持有 Channel 和 Pipeline 的引用,解决循环导入问题(TYPE_CHECKING 仅类型检查时导入):
from typing import TYPE_CHECKING
# 仅类型检查时导入,避免运行时循环导入
if TYPE_CHECKING:
from ImitateNetty.src.Context.Pipeline import Pipeline
from ImitateNetty.src.Context.Channel import Channel
class ChannelContext():
def __init__(self, pipeline: "Pipeline", channel: "Channel"):
self.pipeline: "Pipeline" = pipeline # 关联的流水线
self.channel: "Channel" = channel # 关联的连接
(3)ChannelInit:连接初始化(ChannelInit.py)
负责创建 Channel、Pipeline、Context,管理全局上下文(所有连接的上下文):
import asyncio
from typing import Dict, List, Any, Type
from ImitateNetty.src.Context.ChannelContext import ChannelContext
from ImitateNetty.src.Context.Channel import Channel
from ImitateNetty.src.Context.Pipeline import Pipeline
from ImitateNetty.src.HandleAbc.HandleAbc import HandleAbc
from ImitateNetty.src.HandleAbc.StartHandle import StartHandle
class ChannelInit():
# 全局上下文:key=StreamWriter,value=ChannelContext
allContext: Dict[Any, ChannelContext] = dict()
@staticmethod
async def init(read: asyncio.StreamReader, write: asyncio.StreamWriter, handles: List[HandleAbc]):
# 1. 创建Channel(封装读写流)
channel: Channel = Channel(read, write)
# 2. 创建Pipeline(流水线)
pipeline: Pipeline = Pipeline()
# 3. 添加处理器(先加StartHandle,再加业务处理器)
ChannelInit._setHandle(pipeline, handles)
# 4. 创建上下文并加入全局管理
context: ChannelContext = ChannelContext(pipeline, channel)
ChannelInit.allContext[ChannelInit._generateContextKey(write)] = context
@staticmethod
def _setHandle(pipeline: Pipeline, handles):
# 添加入口处理器(StartHandle是固定第一个)
pipeline.addHandle(StartHandle())
# 添加业务处理器(校验是否为HandleAbc子类)
for ele in handles:
if not isinstance(ele, HandleAbc):
raise ValueError(f"对象:{ele}不是HandleAbc的子类")
pipeline.addHandle(ele)
@staticmethod
def _generateContextKey(write):
return write # 用StreamWriter作为上下文唯一标识
(4)Pipeline:处理器流水线(核心)
Pipeline 是整个框架的“心脏”,负责管理 Handler、按方向(入站/出站)执行 Handler 链。核心逻辑:
- 入站(Inbound):正向遍历 Handler 列表;
- 出站(Outbound):反向遍历 Handler 列表;
- 通用 Handler:同时支持入站/出站遍历。
from typing import Any, List, TYPE_CHECKING
if TYPE_CHECKING:
from ImitateNetty.src.Context.ChannelContext import ChannelContext
from ImitateNetty.src.HandleAbc import HandleAbc
from ImitateNetty.src.HandleAbc.InBountHandle import InbountHandle
from ImitateNetty.src.HandleAbc.OutBoundHandle import OutBoundHandle
from ImitateNetty.src.HandleAbc.GeneralHandle import GeneralHandle
class Pipeline():
def __init__(self):
self._pipeline = [] # Handler类列表(维护顺序)
self._handles: dict[Any, HandleAbc] = {} # Handler实例映射(类→实例)
# 添加处理器:记录类和实例
def addHandle(self, handle):
handleName = handle.__class__
self._pipeline.append(handleName)
self._handles[handleName] = handle
# 移除处理器
def removeHandle(self, handleName):
if handleName not in self._pipeline:
raise ValueError(f"试图移除不存在的处理器:{handleName}")
self._pipeline.remove(handleName)
del self._handles[handleName]
# 按类名获取处理器实例
def getHandleByName(self, handleName) -> HandleAbc:
return self._handles[handleName]
# 按索引获取处理器实例
def getHandleByIndex(self, handleIndex: int) -> HandleAbc:
handleName = self._pipeline[handleIndex]
return self.getHandleByName(handleName)
# 获取处理器列表(副本,防止外部修改)
def getHandleList(self) -> List[Any]:
return self._pipeline.copy()
# 核心:执行处理器(根据类型分发到入站/出站/通用逻辑)
async def exceHandle(self, handleName, context: "ChannelContext", msg: Any, nextHandleClass):
if nextHandleClass == InbountHandle:
await self.exceInbountHandle(handleName, context, msg)
elif nextHandleClass == OutBoundHandle:
await self.exceOutBound(handleName, context, msg)
elif nextHandleClass == GeneralHandle:
await self.exceGeneralHandle(handleName, context, msg, nextHandleClass)
# 执行入站处理器(正向遍历)
async def exceInbountHandle(self, handleName: Any, context: "ChannelContext", msg: Any):
nowHandle: InbountHandle = self.getHandleByName(handleName)
newMsg = await nowHandle.read(context, msg) # 执行当前处理器
handleNameList = self.getHandleList()
nowHandleIndex = handleNameList.index(handleName)
# 遍历下一个处理器(正向)
for index in range(nowHandleIndex + 1, len(handleNameList)):
if index == len(handleNameList) - 1:
return newMsg
nextHandle = self.getHandleByIndex(index)
# 找到下一个入站/通用处理器并执行
if isinstance(nextHandle, (InbountHandle, GeneralHandle)):
await self.exceHandle(handleNameList[index], context, newMsg, InbountHandle)
break
# 执行出站处理器(反向遍历)
async def exceOutBound(self, handleName: Any, context: "ChannelContext", msg: Any):
nowHandle: OutBoundHandle = self.getHandleByName(handleName)
newMsg = await nowHandle.write(context, msg) # 执行当前处理器
handleNameList = self.getHandleList()
nowHandleIndex = handleNameList.index(handleName)
if nowHandleIndex == 0:
return newMsg
# 反向遍历找下一个出站/通用处理器
for index in range(len(handleNameList)-nowHandleIndex-1, len(handleNameList)+1):
nextHandleIndex = -index
nextHandle = self.getHandleByIndex(nextHandleIndex)
if isinstance(nextHandle, (OutBoundHandle, GeneralHandle)):
await self.exceHandle(handleNameList[nextHandleIndex], context, newMsg, OutBoundHandle)
break
# 执行通用处理器(根据入站/出站方向继续遍历)
async def exceGeneralHandle(self, handleName: Any, context: "ChannelContext", msg: Any, nextHandleClass):
handle = self.getHandleByName(handleName)
newMsg = await handle.fun(context, msg)
handleNameList = self.getHandleList()
nowHandleIndex = handleNameList.index(handleName)
if nextHandleClass == InbountHandle:
# 入站方向:正向找下一个入站处理器
if nowHandleIndex == len(handleNameList) - 1:
return newMsg
for index in range(nowHandleIndex + 1, len(handleNameList)):
nextHandle = self.getHandleByIndex(index)
if nextHandle.__class__ == nextHandleClass:
await self.exceHandle(handleNameList[index], context, newMsg, nextHandleClass)
break
elif nextHandleClass == OutBoundHandle:
# 出站方向:反向找下一个出站处理器
if nowHandleIndex == -len(handleNameList):
return newMsg
for index in range(len(handleNameList)-nowHandleIndex-1, len(handleNameList)+1):
nextHandleIndex = -index
nextHandle = self.getHandleByIndex(nextHandleIndex)
if isinstance(nextHandle, (OutBoundHandle, GeneralHandle)):
await self.exceHandle(handleNameList[nextHandleIndex], context, newMsg, OutBoundHandle)
break
四、业务层实现(service)
框架层实现后,业务层只需编写自定义 Handler、读取配置、启动服务即可。
4.1 配置读取(ApplicationConf.py)
读取 TOML 配置文件,获取服务地址和端口:
import tomllib
class ApplicationConf():
conf: dict = None
# 读取配置文件(注意路径:相对于启动脚本)
with open("../resource/application.toml","rb") as f:
conf: dict = tomllib.load(f)
application.toml 配置内容:
[TCP.service]
addr = "127.0.0.1"
port = 8888
4.2 自定义业务Handler
(1)入站处理器:SimpleDecode.py(解码字节→字符串)
from typing import Any
from ImitateNetty.src.Context.ChannelContext import ChannelContext
from ImitateNetty.src.HandleAbc.InBountHandle import InbountHandle
from loguru import logger
class SimpleDecode(InbountHandle):
async def read(self, context: ChannelContext, msg: Any):
# 字节解码为UTF-8字符串
msg = msg.decode("utf-8")
logger.info("接收到消息:{}", msg)
# 回复客户端(触发出站处理器链)
await context.channel.sendMsg(context, msg)
return msg
(2)出站处理器:SimpleEncoder.py(编码字符串→字节)
from typing import Any
from ImitateNetty.src.Context.ChannelContext import ChannelContext
from ImitateNetty.src.HandleAbc.OutBoundHandle import OutBoundHandle
class SimpleEncoder(OutBoundHandle):
async def write(self, context: ChannelContext, msg: Any):
# 字符串编码为字节
msg = msg.encode("utf-8")
return msg
4.3 服务端启动入口(ServiceApplication.py)
import asyncio
from ImitateNetty.src.Context.ChannelInit import ChannelInit
from ImitateNetty.src.Context.ChannelContext import ChannelContext
from ImitateNetty.src.HandleAbc.StartHandle import StartHandle
from service.src.Handle.outbound.SimpleEncoder import SimpleEncoder
from service.src.Handle.Inbount.SimpleDecode import SimpleDecode
from service.src.conf.ApplicationConf import ApplicationConf
from loguru import logger
async def handleFun(read: asyncio.StreamReader, write: asyncio.StreamWriter):
"""每个客户端连接的处理函数"""
peer = write.get_extra_info("peername")
logger.debug("客户端:{} 连接到服务器", peer)
# 1. 定义业务处理器列表
handleList = [SimpleDecode(), SimpleEncoder()]
# 2. 初始化连接(创建Channel/Pipeline/Context)
await ChannelInit.init(read, write, handleList)
# 3. 循环读取客户端数据
while True:
data = await read.read(1024)
if not data: # 客户端断开连接
logger.debug("客户端:{} 断开连接", peer)
break
# 4. 获取上下文,启动入站处理器链
context: ChannelContext = ChannelInit.allContext[write]
await context.pipeline.exceHandle(StartHandle, context, data, InbountHandle)
async def main():
"""启动TCP服务端"""
# 从配置读取地址和端口
host = ApplicationConf.conf['TCP']['service']['addr']
port = ApplicationConf.conf['TCP']['service']['port']
# 创建TCP服务
server = await asyncio.start_server(handleFun, host=host, port=port)
logger.info("服务器启动完毕:{}", server.sockets[0].getsockname())
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())
五、运行与测试
5.1 运行服务端
执行 ServiceApplication.py,日志会输出:
2026-01-15 10:00:00.000 | INFO | __main__:main:35 - 服务器启动完毕:('127.0.0.1', 8888)
5.2 测试客户端
用 telnet 或 Python 简易客户端测试:
# 简易客户端代码
import asyncio
async def test_client():
reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
# 发送消息
msg = "Hello Netty Demo!"
writer.write(msg.encode('utf-8'))
await writer.drain()
# 接收回复
data = await reader.read(1024)
print(f"收到服务器回复: {data.decode('utf-8')}")
# 关闭连接
writer.close()
await writer.wait_closed()
asyncio.run(test_client())
服务端日志会输出:
2026-01-15 10:01:00.000 | DEBUG | __main__:handleFun:12 - 客户端:('127.0.0.1', 54321) 连接到服务器
2026-01-15 10:01:00.001 | INFO | service.src.Handle.Inbount.SimpleDecode:read:11 - 接收到消息:Hello Netty Demo!
2026-01-15 10:01:00.002 | DEBUG | __main__:handleFun:22 - 客户端:('127.0.0.1', 54321) 断开连接
客户端会收到回复:Hello Netty Demo!,验证整个链路(入站解码→出站编码→消息回写)正常工作。
六、总结与反思
6.1 核心收获(Netty原理)
通过这个 Demo,我更深刻理解了 Netty 的核心设计:
- Handler 链式调用:Pipeline 是 Handler 的容器,入站正向、出站反向遍历是 Netty 高性能的核心之一;
- 上下文解耦:ChannelContext 封装 Channel 和 Pipeline,让 Handler 无需直接依赖底层连接;
- 抽象分层:入站/出站/通用 Handler 的抽象,让业务逻辑与框架核心解耦。
6.2 Demo 的不足(仅用于学习)
再次强调:这只是学习 Demo,绝对不建议用于生产环境,主要不足:
- 异常处理缺失:未处理网络断开、编码失败等异常;
- 性能问题:用递归实现 Handler 链式调用,高并发下会栈溢出;
- 功能简化:未实现 Netty 的 EventLoop、ChannelPool、心跳检测等核心特性;
- 资源管理:全局上下文未清理断开的连接,会导致内存泄漏。
6.3 后续学习方向
这个 Demo 只是入门,后续会继续深入:
- 补全 Netty 核心特性(EventLoop、心跳、粘包拆包);
- 优化 Python 异步实现(用迭代代替递归、添加资源回收);
- 对比 Netty 源码,理解更底层的 Reactor 模型。
希望这篇分享能帮助和我一样学习 Netty 的同学——通过“手写简易版”的方式,把抽象的框架原理落地成可运行的代码,是理解技术本质最好的方式。