核心概念
ChannelPipeline 是 Netty 中基于责任链模式实现的事件处理机制,负责管理 ChannelHandler 的执行顺序。
每个 Channel 都拥有一个独立的 Pipeline,所有 I/O 事件和用户自定义事件都会在 Pipeline 中传播。
Pipeline 的结构
双向链表设计
Pipeline 内部维护了一个双向链表,节点类型为 ChannelHandlerContext:
┌─────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌──────┐
│Head │ ←→ │Handler1 │ ←→ │Handler2 │ ←→ │Handler3 │ ←→ │ Tail │
└─────┘ └─────────┘ └─────────┘ └─────────┘ └──────┘
↓ ↓ ↓ ↓ ↓
Inbound Inbound Outbound Outbound Outbound
核心源码结构
public class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannelHandlerContext head; // 头节点
final AbstractChannelHandlerContext tail; // 尾节点
public DefaultChannelPipeline(Channel channel) {
this.channel = channel;
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
}
事件传播机制
1. 入站事件(Inbound)
传播方向:Head → Tail(从链表头到尾)
常见入站事件:
channelRegistered:Channel 注册到 EventLoopchannelActive:Channel 激活channelRead:读取数据channelInactive:Channel 关闭exceptionCaught:异常捕获
代码示例:
ch.pipeline()
.addLast("decoder", new StringDecoder()) // 解码器
.addLast("handler1", new MyInboundHandler1()) // 业务处理 1
.addLast("handler2", new MyInboundHandler2()); // 业务处理 2
事件传播流程:
public class MyInboundHandler1 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("Handler1 处理");
ctx.fireChannelRead(msg); // 传递给下一个 Handler
}
}
public class MyInboundHandler2 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("Handler2 处理");
// 不调用 ctx.fireChannelRead(),事件传播终止
}
}
2. 出站事件(Outbound)
传播方向:Tail → Head(从链表尾到头)
常见出站事件:
bind:绑定端口connect:建立连接write:写数据flush:刷新缓冲区close:关闭连接
代码示例:
ch.pipeline()
.addLast("handler", new MyBusinessHandler()) // 业务处理
.addLast("encoder", new StringEncoder()); // 编码器
事件传播流程:
// 业务代码发起写操作
ctx.writeAndFlush("Hello Netty");
// Pipeline 传播顺序:Tail → encoder → Head → 网络发送
源码实现细节
1. Handler 的添加过程
// ChannelPipeline.addLast()
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, name, handler);
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 1. 检查是否重复添加(@Sharable 注解除外)
checkMultiplicity(handler);
// 2. 创建 ChannelHandlerContext 包装 Handler
newCtx = newContext(group, filterName(name, handler), handler);
// 3. 插入链表(tail 之前)
addLast0(newCtx);
}
// 4. 回调 handlerAdded()
callHandlerAdded0(newCtx);
return this;
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
2. 入站事件传播源码
// ChannelHandlerContext.fireChannelRead()
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
// 查找下一个入站 Handler
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound); // 跳过非入站 Handler
return ctx;
}
// 执行 Handler 的 channelRead()
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
next.invoker().invokeChannelRead(next, msg);
}
关键点:
- 自动跳过不符合类型的 Handler(入站事件跳过出站 Handler)
- 异常会自动触发
exceptionCaught()事件
3. 出站事件传播源码
// ChannelHandlerContext.write()
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
write(msg, false, promise); // flush = false
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound(); // 向前查找
next.invokeWrite(msg, promise);
}
// 查找上一个出站 Handler
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev; // 向前遍历
} while (!ctx.outbound);
return ctx;
}
核心设计要点
1. ChannelHandlerContext 的作用
为什么需要 Context 包装 Handler?
public interface ChannelHandlerContext {
Channel channel(); // 关联的 Channel
EventExecutor executor(); // 执行线程
ChannelHandler handler(); // 包装的 Handler
ChannelPipeline pipeline(); // 所属 Pipeline
ChannelHandlerContext fireChannelRead(Object msg); // 传播入站事件
ChannelFuture write(Object msg); // 传播出站事件
}
核心价值:
- 解耦 Handler 和 Pipeline:Handler 可以复用,Context 维护位置信息
- 控制事件传播:
ctx.fireChannelRead()vschannel.pipeline().fireChannelRead() - 线程调度:可指定 Handler 在特定线程池中执行
2. 事件传播的两种方式
方式 1:从当前 Handler 的下一个开始传播(推荐)
ctx.fireChannelRead(msg); // 从下一个 InboundHandler 开始
ctx.write(msg); // 从上一个 OutboundHandler 开始
方式 2:从 Pipeline 的头/尾开始传播
ctx.channel().pipeline().fireChannelRead(msg); // 从 Head 开始
ctx.channel().write(msg); // 从 Tail 开始
陷阱示例:
// ❌ 错误:会导致死循环
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.channel().pipeline().fireChannelRead(msg); // 又从 Head 开始,死循环
}
// ✅ 正确:从下一个 Handler 开始
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg);
}
3. 异常处理机制
public class MyHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
// 业务逻辑
} catch (Exception e) {
ctx.fireExceptionCaught(e); // 触发异常事件
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close(); // 关闭连接
}
}
最佳实践:在 Pipeline 末尾添加统一的异常处理器
ch.pipeline()
.addLast("decoder", new MyDecoder())
.addLast("handler", new MyHandler())
.addLast("exceptionHandler", new GlobalExceptionHandler()); // 兜底
实际应用场景
典型 Pipeline 配置
ch.pipeline()
// 1. 协议解码
.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1024, 0, 4))
// 2. 业务解码
.addLast("protobufDecoder", new ProtobufDecoder(MyProto.Request.getDefaultInstance()))
// 3. 业务处理(可能耗时,使用独立线程池)
.addLast(bizExecutor, "businessHandler", new BusinessHandler())
// 4. 业务编码
.addLast("protobufEncoder", new ProtobufEncoder())
// 5. 协议编码
.addLast("frameEncoder", new LengthFieldPrepender(4))
// 6. 异常兜底
.addLast("exceptionHandler", new ExceptionHandler());
面试答题总结
Pipeline 的核心原理:
- 双向链表:维护 ChannelHandlerContext 节点
- 责任链模式:事件在链表中依次传播
- 入站事件:Head → Tail
- 出站事件:Tail → Head
关键设计:
- ChannelHandlerContext 包装 Handler,实现解耦和复用
ctx.fireChannelRead()vspipeline().fireChannelRead()的区别- 异常会自动传播,需在末尾添加兜底处理器
性能优化:
- 耗时操作使用独立线程池:
addLast(executor, handler) - 避免在 Pipeline 中执行阻塞操作
典型应用:编解码、业务处理、日志记录、异常处理等层层递进。