核心概念

ChannelPipeline 是 Netty 中基于责任链模式实现的事件处理机制,负责管理 ChannelHandler 的执行顺序。

每个 Channel 都拥有一个独立的 Pipeline,所有 I/O 事件和用户自定义事件都会在 Pipeline 中传播。


Pipeline 的结构

双向链表设计

Pipeline 内部维护了一个双向链表,节点类型为 ChannelHandlerContext

┌─────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌──────┐
│Head │ ←→ │Handler1 │ ←→ │Handler2 │ ←→ │Handler3 │ ←→ │ Tail │
└─────┘    └─────────┘    └─────────┘    └─────────┘    └──────┘
  ↓            ↓             ↓             ↓              ↓
Inbound     Inbound       Outbound      Outbound      Outbound

核心源码结构

public class DefaultChannelPipeline implements ChannelPipeline {
    final AbstractChannelHandlerContext head;  // 头节点
    final AbstractChannelHandlerContext tail;  // 尾节点
    
    public DefaultChannelPipeline(Channel channel) {
        this.channel = channel;
        tail = new TailContext(this);
        head = new HeadContext(this);
        head.next = tail;
        tail.prev = head;
    }
}

事件传播机制

1. 入站事件(Inbound)

传播方向:Head → Tail(从链表头到尾)

常见入站事件

  • channelRegistered:Channel 注册到 EventLoop
  • channelActive:Channel 激活
  • channelRead:读取数据
  • channelInactive:Channel 关闭
  • exceptionCaught:异常捕获

代码示例

ch.pipeline()
  .addLast("decoder", new StringDecoder())       // 解码器
  .addLast("handler1", new MyInboundHandler1())  // 业务处理 1
  .addLast("handler2", new MyInboundHandler2()); // 业务处理 2

事件传播流程

public class MyInboundHandler1 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("Handler1 处理");
        ctx.fireChannelRead(msg);  // 传递给下一个 Handler
    }
}

public class MyInboundHandler2 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("Handler2 处理");
        // 不调用 ctx.fireChannelRead(),事件传播终止
    }
}

2. 出站事件(Outbound)

传播方向:Tail → Head(从链表尾到头)

常见出站事件

  • bind:绑定端口
  • connect:建立连接
  • write:写数据
  • flush:刷新缓冲区
  • close:关闭连接

代码示例

ch.pipeline()
  .addLast("handler", new MyBusinessHandler())  // 业务处理
  .addLast("encoder", new StringEncoder());     // 编码器

事件传播流程

// 业务代码发起写操作
ctx.writeAndFlush("Hello Netty");

// Pipeline 传播顺序:Tail → encoder → Head → 网络发送

源码实现细节

1. Handler 的添加过程

// ChannelPipeline.addLast()
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
    return addLast(null, name, handler);
}

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        // 1. 检查是否重复添加(@Sharable 注解除外)
        checkMultiplicity(handler);
        
        // 2. 创建 ChannelHandlerContext 包装 Handler
        newCtx = newContext(group, filterName(name, handler), handler);
        
        // 3. 插入链表(tail 之前)
        addLast0(newCtx);
    }
    
    // 4. 回调 handlerAdded()
    callHandlerAdded0(newCtx);
    return this;
}

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

2. 入站事件传播源码

// ChannelHandlerContext.fireChannelRead()
public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

// 查找下一个入站 Handler
private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);  // 跳过非入站 Handler
    return ctx;
}

// 执行 Handler 的 channelRead()
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    next.invoker().invokeChannelRead(next, msg);
}

关键点

  • 自动跳过不符合类型的 Handler(入站事件跳过出站 Handler)
  • 异常会自动触发 exceptionCaught() 事件

3. 出站事件传播源码

// ChannelHandlerContext.write()
public ChannelFuture write(Object msg) {
    return write(msg, newPromise());
}

public ChannelFuture write(final Object msg, final ChannelPromise promise) {
    write(msg, false, promise);  // flush = false
    return promise;
}

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();  // 向前查找
    next.invokeWrite(msg, promise);
}

// 查找上一个出站 Handler
private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;  // 向前遍历
    } while (!ctx.outbound);
    return ctx;
}

核心设计要点

1. ChannelHandlerContext 的作用

为什么需要 Context 包装 Handler?

public interface ChannelHandlerContext {
    Channel channel();                        // 关联的 Channel
    EventExecutor executor();                 // 执行线程
    ChannelHandler handler();                 // 包装的 Handler
    ChannelPipeline pipeline();               // 所属 Pipeline
    
    ChannelHandlerContext fireChannelRead(Object msg);  // 传播入站事件
    ChannelFuture write(Object msg);                    // 传播出站事件
}

核心价值

  1. 解耦 Handler 和 Pipeline:Handler 可以复用,Context 维护位置信息
  2. 控制事件传播ctx.fireChannelRead() vs channel.pipeline().fireChannelRead()
  3. 线程调度:可指定 Handler 在特定线程池中执行

2. 事件传播的两种方式

方式 1:从当前 Handler 的下一个开始传播(推荐)

ctx.fireChannelRead(msg);  // 从下一个 InboundHandler 开始
ctx.write(msg);            // 从上一个 OutboundHandler 开始

方式 2:从 Pipeline 的头/尾开始传播

ctx.channel().pipeline().fireChannelRead(msg);  // 从 Head 开始
ctx.channel().write(msg);                       // 从 Tail 开始

陷阱示例

// ❌ 错误:会导致死循环
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ctx.channel().pipeline().fireChannelRead(msg);  // 又从 Head 开始,死循环
}

// ✅ 正确:从下一个 Handler 开始
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ctx.fireChannelRead(msg);
}

3. 异常处理机制

public class MyHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            // 业务逻辑
        } catch (Exception e) {
            ctx.fireExceptionCaught(e);  // 触发异常事件
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();  // 关闭连接
    }
}

最佳实践:在 Pipeline 末尾添加统一的异常处理器

ch.pipeline()
  .addLast("decoder", new MyDecoder())
  .addLast("handler", new MyHandler())
  .addLast("exceptionHandler", new GlobalExceptionHandler());  // 兜底

实际应用场景

典型 Pipeline 配置

ch.pipeline()
  // 1. 协议解码
  .addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1024, 0, 4))
  
  // 2. 业务解码
  .addLast("protobufDecoder", new ProtobufDecoder(MyProto.Request.getDefaultInstance()))
  
  // 3. 业务处理(可能耗时,使用独立线程池)
  .addLast(bizExecutor, "businessHandler", new BusinessHandler())
  
  // 4. 业务编码
  .addLast("protobufEncoder", new ProtobufEncoder())
  
  // 5. 协议编码
  .addLast("frameEncoder", new LengthFieldPrepender(4))
  
  // 6. 异常兜底
  .addLast("exceptionHandler", new ExceptionHandler());

面试答题总结

Pipeline 的核心原理

  1. 双向链表:维护 ChannelHandlerContext 节点
  2. 责任链模式:事件在链表中依次传播
  3. 入站事件:Head → Tail
  4. 出站事件:Tail → Head

关键设计

  • ChannelHandlerContext 包装 Handler,实现解耦和复用
  • ctx.fireChannelRead() vs pipeline().fireChannelRead() 的区别
  • 异常会自动传播,需在末尾添加兜底处理器

性能优化

  • 耗时操作使用独立线程池:addLast(executor, handler)
  • 避免在 Pipeline 中执行阻塞操作

典型应用:编解码、业务处理、日志记录、异常处理等层层递进。