核心概念

Dubbo 服务调用流程可以分为三个阶段

  1. 启动阶段:服务注册与发现
  2. 调用阶段:从方法调用到网络传输
  3. 响应阶段:接收响应并返回结果

整个过程涉及代理拦截、路由选择、负载均衡、协议封装、网络传输、序列化反序列化等多个环节。

完整调用流程

阶段一:启动与准备

// 1. Provider 启动:暴露服务
@Service
public class UserServiceImpl implements UserService {
    public User getUser(Long userId) {
        return userMapper.selectById(userId);
    }
}

// 启动流程:
// 1.1 Spring 扫描 @Service 注解
// 1.2 创建 ServiceConfig 对象
// 1.3 通过 Protocol 暴露服务(启动 Netty Server)
// 1.4 向 Registry 注册服务
//     dubbo://192.168.1.100:20880/com.example.UserService?timeout=3000&...

// 2. Consumer 启动:引用服务
@Reference
private UserService userService;

// 启动流程:
// 2.1 Spring 扫描 @Reference 注解
// 2.2 创建 ReferenceConfig 对象
// 2.3 向 Registry 订阅服务
// 2.4 获取 Provider 地址列表
// 2.5 创建动态代理对象(Javassist/JDK Proxy)
// 2.6 与 Provider 建立 TCP 长连接

阶段二:调用过程(Consumer 端)

// 用户代码调用
User user = userService.getUser(123L);

// ==================== Consumer 端调用链路 ====================

// 【1. Proxy 层】动态代理拦截
InvokerInvocationHandler.invoke() {
    // 构造 RpcInvocation 对象
    RpcInvocation invocation = new RpcInvocation();
    invocation.setMethodName("getUser");
    invocation.setParameterTypes(new Class[]{Long.class});
    invocation.setArguments(new Object[]{123L});
    
    // 调用 Invoker
    return invoker.invoke(invocation);
}

// 【2. Filter 链】执行过滤器
ConsumerContextFilter.invoke() {
    // 设置上下文信息(调用方 IP、时间戳等)
    RpcContext.getContext().setRemoteAddress(...);
    return invoker.invoke(invocation);
}

MonitorFilter.invoke() {
    long start = System.currentTimeMillis();
    Result result = invoker.invoke(invocation);
    // 记录调用统计
    monitor.collect(new Statistics(...));
    return result;
}

// 【3. Cluster 层】集群容错
FailoverClusterInvoker.invoke() {
    // 获取所有可用的 Invoker(Provider 列表)
    List<Invoker> invokers = directory.list(invocation);
    // invokers = [Invoker1(192.168.1.100:20880), Invoker2(192.168.1.101:20880)]
    
    // 负载均衡选择一个 Invoker
    LoadBalance loadBalance = getLoadBalance();
    Invoker selectedInvoker = loadBalance.select(invokers, invocation);
    
    // 调用,如果失败则重试
    try {
        return selectedInvoker.invoke(invocation);
    } catch (RpcException e) {
        // Failover:失败后切换到其他 Provider
        selectedInvoker = loadBalance.select(invokers, invocation);
        return selectedInvoker.invoke(invocation);
    }
}

// 【4. Protocol 层】远程调用
DubboInvoker.invoke() {
    // 选择一个可用的 Client(NettyClient)
    ExchangeClient client = getClients()[0];
    
    // 异步发送请求
    CompletableFuture<Result> future = client.request(invocation);
    
    // 同步等待结果
    return future.get(timeout, TimeUnit.MILLISECONDS);
}

// 【5. Exchange 层】信息交换
HeaderExchangeChannel.request() {
    // 构造 Request 对象
    Request request = new Request();
    request.setId(AtomicLong.getAndIncrement());  // 生成唯一请求ID
    request.setData(invocation);
    request.setTwoWay(true);  // 需要响应
    
    // 创建 Future,等待响应
    DefaultFuture future = new DefaultFuture(channel, request, timeout);
    FUTURES.put(request.getId(), future);  // 保存到全局Map
    
    // 发送请求
    channel.send(request);
    return future;
}

// 【6. Codec 层】编码
DubboCodec.encode() {
    // 编码格式:
    // +------+------+------+------+--------+--------+--------+--------+
    // | Magic| Flag | Status| ReqID (8字节)  | Data Length     |
    // +------+------+------+------+--------+--------+--------+--------+
    // |           Serialized Data (N 字节)                     |
    // +------+------+------+------+--------+--------+--------+--------+
    
    buffer.writeShort(0xdabb);  // 魔数
    buffer.writeByte(flag);
    buffer.writeLong(request.getId());
    
    // 序列化数据
    byte[] data = serialization.serialize(invocation);
    buffer.writeInt(data.length);
    buffer.writeBytes(data);
}

// 【7. Serialize 层】序列化
Hessian2Serialization.serialize() {
    // 将 RpcInvocation 对象序列化为字节数组
    // methodName: "getUser"
    // parameterTypes: [Long.class]
    // arguments: [123L]
    return bytes;
}

// 【8. Transport 层】网络传输
NettyChannel.send() {
    // 通过 Netty 发送数据
    channel.writeAndFlush(message);
}

阶段三:处理与响应(Provider 端)

// ==================== Provider 端处理链路 ====================

// 【1. Transport 层】接收请求
NettyServerHandler.channelRead() {
    // Netty 接收到字节流
    received(channel, message);
}

// 【2. Codec 层】解码
DubboCodec.decode() {
    // 读取魔数、请求ID、数据长度
    short magic = buffer.readShort();
    long requestId = buffer.readLong();
    int dataLength = buffer.readInt();
    byte[] data = buffer.readBytes(dataLength);
    
    // 反序列化
    RpcInvocation invocation = serialization.deserialize(data);
    
    Request request = new Request(requestId);
    request.setData(invocation);
    return request;
}

// 【3. Exchange 层】处理请求
HeaderExchangeHandler.received() {
    // 提交到业务线程池处理
    ExecutorService executor = getExecutor();
    executor.execute(() -> handleRequest(channel, request));
}

// 【4. Filter 链】Provider 端过滤器
ProviderContextFilter.invoke() {
    // 设置 Provider 端上下文
    RpcContext.getContext().setRemoteAddress(channel.remoteAddress());
    return invoker.invoke(invocation);
}

ExecuteLimitFilter.invoke() {
    // 检查并发限制
    int max = getUrl().getParameter("executes", 0);
    if (max > 0 && count.get() >= max) {
        throw new RpcException("超过最大并发数");
    }
    count.incrementAndGet();
    try {
        return invoker.invoke(invocation);
    } finally {
        count.decrementAndGet();
    }
}

// 【5. Invoker 调用】执行真实服务
DubboProtocol.reply() {
    // 根据接口和方法名找到真实的服务实现
    Invoker invoker = exporterMap.get(serviceKey);
    
    // 调用真实方法
    return invoker.invoke(invocation);
}

// 【6. 真实方法执行】
UserServiceImpl.getUser(123L) {
    // 执行业务逻辑
    User user = userMapper.selectById(123L);
    return user;
}

// 【7. 构造响应】
Response response = new Response();
response.setId(request.getId());  // 使用请求的ID
response.setStatus(Response.OK);
response.setResult(user);  // 返回结果

// 【8. 编码并返回】
DubboCodec.encode(response);
channel.writeAndFlush(response);  // 通过 Netty 发送响应

阶段四:接收响应(Consumer 端)

// ==================== Consumer 端接收响应 ====================

// 【1. Transport 层】接收响应
NettyClientHandler.channelRead() {
    received(channel, message);
}

// 【2. Codec 层】解码响应
Response response = DubboCodec.decode(buffer);

// 【3. Exchange 层】处理响应
HeaderExchangeHandler.received() {
    // 根据请求ID找到对应的 Future
    DefaultFuture future = FUTURES.remove(response.getId());
    
    // 设置结果,唤醒等待线程
    future.complete(response.getResult());
}

// 【4. 返回结果】
// 用户线程从 future.get() 返回
User user = (User) future.get();

// 【5. 返回给调用方】
return user;

核心机制详解

1. 异步转同步

// Consumer 发送请求后,如何等待响应?
public class DefaultFuture extends CompletableFuture<Object> {
    
    // 全局 Map,保存所有等待响应的 Future
    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
    
    public DefaultFuture(Channel channel, Request request, int timeout) {
        this.request = request;
        this.timeout = timeout;
        FUTURES.put(request.getId(), this);  // 保存到全局Map
        
        // 超时处理
        TIMEOUT_EXECUTOR.schedule(() -> {
            DefaultFuture future = FUTURES.remove(request.getId());
            if (future != null) {
                future.completeExceptionally(new TimeoutException());
            }
        }, timeout, TimeUnit.MILLISECONDS);
    }
    
    // 接收到响应时调用
    public static void received(Response response) {
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            future.complete(response.getResult());
        }
    }
}

// 调用流程:
// 1. 发送请求,生成 RequestID=100
// 2. 创建 DefaultFuture,放入 FUTURES[100]
// 3. future.get() 阻塞等待
// 4. 收到响应,ResponseID=100
// 5. 从 FUTURES[100] 取出 Future
// 6. future.complete(),唤醒等待线程

2. 负载均衡策略

// Random(随机)
public class RandomLoadBalance extends AbstractLoadBalance {
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, Invocation invocation) {
        int length = invokers.size();
        return invokers.get(random.nextInt(length));
    }
}

// RoundRobin(轮询)
public class RoundRobinLoadBalance extends AbstractLoadBalance {
    private final AtomicInteger index = new AtomicInteger(0);
    
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, Invocation invocation) {
        return invokers.get(Math.abs(index.getAndIncrement() % invokers.size()));
    }
}

// LeastActive(最少活跃数)
public class LeastActiveLoadBalance extends AbstractLoadBalance {
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, Invocation invocation) {
        // 选择活跃请求数最少的 Invoker
        int leastActive = Integer.MAX_VALUE;
        Invoker<T> selected = null;
        for (Invoker<T> invoker : invokers) {
            int active = RpcStatus.getStatus(invoker.getUrl()).getActive();
            if (active < leastActive) {
                leastActive = active;
                selected = invoker;
            }
        }
        return selected;
    }
}

3. 容错机制

// Failover(失败自动切换,默认)
@Reference(cluster = "failover", retries = 2)
private UserService userService;

// Failfast(快速失败,不重试)
@Reference(cluster = "failfast")
private UserService userService;

// Failsafe(失败安全,忽略异常)
@Reference(cluster = "failsafe")
private UserService userService;

// Failback(失败自动恢复,后台记录失败请求,定时重发)
@Reference(cluster = "failback")
private UserService userService;

// Forking(并行调用多个服务器,只要一个成功即返回)
@Reference(cluster = "forking", forks = 2)
private UserService userService;

性能优化点

1. 连接复用

// 长连接 + 连接池
@Reference(
    connections = 10,   // 每个 Provider 10 个连接
    lazy = false        // 启动时建立连接
)
private UserService userService;

2. 异步调用

// 同步调用(阻塞)
User user = userService.getUser(123L);

// 异步调用(非阻塞)
CompletableFuture<User> future = RpcContext.getContext()
    .asyncCall(() -> userService.getUser(123L));

future.thenAccept(user -> {
    // 异步处理结果
    System.out.println(user);
});

3. 序列化选择

// 性能对比:Kryo > Hessian2 > JSON
@Service(serialization = "kryo")  // 最快,但需要注册类
@Service(serialization = "hessian2")  // 默认,性能与兼容性平衡
@Service(serialization = "protobuf")  // 跨语言友好

答题总结

Dubbo 服务调用的完整流程

1. 启动阶段

  • Provider 暴露服务并注册到注册中心
  • Consumer 订阅服务并创建动态代理

2. Consumer 端调用链

  • Proxy:动态代理拦截方法调用
  • Filter:执行过滤器链(监控、上下文、限流等)
  • Cluster:负载均衡选择 Provider,容错重试
  • Protocol:选择 Client 发起调用
  • Exchange:构造 Request,创建 Future
  • Codec:编码为字节流
  • Serialize:序列化参数
  • Transport:Netty 发送数据

3. Provider 端处理链

  • Transport:Netty 接收数据
  • Codec:解码字节流
  • Exchange:提交到业务线程池
  • Filter:执行 Provider 端过滤器
  • Invoker:调用真实服务方法
  • 响应:编码并返回结果

4. Consumer 端接收响应

  • 解码响应,根据 RequestID 找到 Future
  • 唤醒等待线程,返回结果

核心机制

  • 异步转同步:通过 DefaultFuture + RequestID 映射
  • 负载均衡:Random、RoundRobin、LeastActive
  • 容错机制:Failover、Failfast、Failsafe

面试技巧:建议画出调用链路图,强调异步转同步负载均衡机制,体现对 Dubbo 底层原理的深入理解。