核心概念
Dubbo 服务调用流程可以分为三个阶段:
- 启动阶段:服务注册与发现
- 调用阶段:从方法调用到网络传输
- 响应阶段:接收响应并返回结果
整个过程涉及代理拦截、路由选择、负载均衡、协议封装、网络传输、序列化反序列化等多个环节。
完整调用流程
阶段一:启动与准备
// 1. Provider 启动:暴露服务
@Service
public class UserServiceImpl implements UserService {
public User getUser(Long userId) {
return userMapper.selectById(userId);
}
}
// 启动流程:
// 1.1 Spring 扫描 @Service 注解
// 1.2 创建 ServiceConfig 对象
// 1.3 通过 Protocol 暴露服务(启动 Netty Server)
// 1.4 向 Registry 注册服务
// dubbo://192.168.1.100:20880/com.example.UserService?timeout=3000&...
// 2. Consumer 启动:引用服务
@Reference
private UserService userService;
// 启动流程:
// 2.1 Spring 扫描 @Reference 注解
// 2.2 创建 ReferenceConfig 对象
// 2.3 向 Registry 订阅服务
// 2.4 获取 Provider 地址列表
// 2.5 创建动态代理对象(Javassist/JDK Proxy)
// 2.6 与 Provider 建立 TCP 长连接
阶段二:调用过程(Consumer 端)
// 用户代码调用
User user = userService.getUser(123L);
// ==================== Consumer 端调用链路 ====================
// 【1. Proxy 层】动态代理拦截
InvokerInvocationHandler.invoke() {
// 构造 RpcInvocation 对象
RpcInvocation invocation = new RpcInvocation();
invocation.setMethodName("getUser");
invocation.setParameterTypes(new Class[]{Long.class});
invocation.setArguments(new Object[]{123L});
// 调用 Invoker
return invoker.invoke(invocation);
}
// 【2. Filter 链】执行过滤器
ConsumerContextFilter.invoke() {
// 设置上下文信息(调用方 IP、时间戳等)
RpcContext.getContext().setRemoteAddress(...);
return invoker.invoke(invocation);
}
MonitorFilter.invoke() {
long start = System.currentTimeMillis();
Result result = invoker.invoke(invocation);
// 记录调用统计
monitor.collect(new Statistics(...));
return result;
}
// 【3. Cluster 层】集群容错
FailoverClusterInvoker.invoke() {
// 获取所有可用的 Invoker(Provider 列表)
List<Invoker> invokers = directory.list(invocation);
// invokers = [Invoker1(192.168.1.100:20880), Invoker2(192.168.1.101:20880)]
// 负载均衡选择一个 Invoker
LoadBalance loadBalance = getLoadBalance();
Invoker selectedInvoker = loadBalance.select(invokers, invocation);
// 调用,如果失败则重试
try {
return selectedInvoker.invoke(invocation);
} catch (RpcException e) {
// Failover:失败后切换到其他 Provider
selectedInvoker = loadBalance.select(invokers, invocation);
return selectedInvoker.invoke(invocation);
}
}
// 【4. Protocol 层】远程调用
DubboInvoker.invoke() {
// 选择一个可用的 Client(NettyClient)
ExchangeClient client = getClients()[0];
// 异步发送请求
CompletableFuture<Result> future = client.request(invocation);
// 同步等待结果
return future.get(timeout, TimeUnit.MILLISECONDS);
}
// 【5. Exchange 层】信息交换
HeaderExchangeChannel.request() {
// 构造 Request 对象
Request request = new Request();
request.setId(AtomicLong.getAndIncrement()); // 生成唯一请求ID
request.setData(invocation);
request.setTwoWay(true); // 需要响应
// 创建 Future,等待响应
DefaultFuture future = new DefaultFuture(channel, request, timeout);
FUTURES.put(request.getId(), future); // 保存到全局Map
// 发送请求
channel.send(request);
return future;
}
// 【6. Codec 层】编码
DubboCodec.encode() {
// 编码格式:
// +------+------+------+------+--------+--------+--------+--------+
// | Magic| Flag | Status| ReqID (8字节) | Data Length |
// +------+------+------+------+--------+--------+--------+--------+
// | Serialized Data (N 字节) |
// +------+------+------+------+--------+--------+--------+--------+
buffer.writeShort(0xdabb); // 魔数
buffer.writeByte(flag);
buffer.writeLong(request.getId());
// 序列化数据
byte[] data = serialization.serialize(invocation);
buffer.writeInt(data.length);
buffer.writeBytes(data);
}
// 【7. Serialize 层】序列化
Hessian2Serialization.serialize() {
// 将 RpcInvocation 对象序列化为字节数组
// methodName: "getUser"
// parameterTypes: [Long.class]
// arguments: [123L]
return bytes;
}
// 【8. Transport 层】网络传输
NettyChannel.send() {
// 通过 Netty 发送数据
channel.writeAndFlush(message);
}
阶段三:处理与响应(Provider 端)
// ==================== Provider 端处理链路 ====================
// 【1. Transport 层】接收请求
NettyServerHandler.channelRead() {
// Netty 接收到字节流
received(channel, message);
}
// 【2. Codec 层】解码
DubboCodec.decode() {
// 读取魔数、请求ID、数据长度
short magic = buffer.readShort();
long requestId = buffer.readLong();
int dataLength = buffer.readInt();
byte[] data = buffer.readBytes(dataLength);
// 反序列化
RpcInvocation invocation = serialization.deserialize(data);
Request request = new Request(requestId);
request.setData(invocation);
return request;
}
// 【3. Exchange 层】处理请求
HeaderExchangeHandler.received() {
// 提交到业务线程池处理
ExecutorService executor = getExecutor();
executor.execute(() -> handleRequest(channel, request));
}
// 【4. Filter 链】Provider 端过滤器
ProviderContextFilter.invoke() {
// 设置 Provider 端上下文
RpcContext.getContext().setRemoteAddress(channel.remoteAddress());
return invoker.invoke(invocation);
}
ExecuteLimitFilter.invoke() {
// 检查并发限制
int max = getUrl().getParameter("executes", 0);
if (max > 0 && count.get() >= max) {
throw new RpcException("超过最大并发数");
}
count.incrementAndGet();
try {
return invoker.invoke(invocation);
} finally {
count.decrementAndGet();
}
}
// 【5. Invoker 调用】执行真实服务
DubboProtocol.reply() {
// 根据接口和方法名找到真实的服务实现
Invoker invoker = exporterMap.get(serviceKey);
// 调用真实方法
return invoker.invoke(invocation);
}
// 【6. 真实方法执行】
UserServiceImpl.getUser(123L) {
// 执行业务逻辑
User user = userMapper.selectById(123L);
return user;
}
// 【7. 构造响应】
Response response = new Response();
response.setId(request.getId()); // 使用请求的ID
response.setStatus(Response.OK);
response.setResult(user); // 返回结果
// 【8. 编码并返回】
DubboCodec.encode(response);
channel.writeAndFlush(response); // 通过 Netty 发送响应
阶段四:接收响应(Consumer 端)
// ==================== Consumer 端接收响应 ====================
// 【1. Transport 层】接收响应
NettyClientHandler.channelRead() {
received(channel, message);
}
// 【2. Codec 层】解码响应
Response response = DubboCodec.decode(buffer);
// 【3. Exchange 层】处理响应
HeaderExchangeHandler.received() {
// 根据请求ID找到对应的 Future
DefaultFuture future = FUTURES.remove(response.getId());
// 设置结果,唤醒等待线程
future.complete(response.getResult());
}
// 【4. 返回结果】
// 用户线程从 future.get() 返回
User user = (User) future.get();
// 【5. 返回给调用方】
return user;
核心机制详解
1. 异步转同步
// Consumer 发送请求后,如何等待响应?
public class DefaultFuture extends CompletableFuture<Object> {
// 全局 Map,保存所有等待响应的 Future
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
public DefaultFuture(Channel channel, Request request, int timeout) {
this.request = request;
this.timeout = timeout;
FUTURES.put(request.getId(), this); // 保存到全局Map
// 超时处理
TIMEOUT_EXECUTOR.schedule(() -> {
DefaultFuture future = FUTURES.remove(request.getId());
if (future != null) {
future.completeExceptionally(new TimeoutException());
}
}, timeout, TimeUnit.MILLISECONDS);
}
// 接收到响应时调用
public static void received(Response response) {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.complete(response.getResult());
}
}
}
// 调用流程:
// 1. 发送请求,生成 RequestID=100
// 2. 创建 DefaultFuture,放入 FUTURES[100]
// 3. future.get() 阻塞等待
// 4. 收到响应,ResponseID=100
// 5. 从 FUTURES[100] 取出 Future
// 6. future.complete(),唤醒等待线程
2. 负载均衡策略
// Random(随机)
public class RandomLoadBalance extends AbstractLoadBalance {
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, Invocation invocation) {
int length = invokers.size();
return invokers.get(random.nextInt(length));
}
}
// RoundRobin(轮询)
public class RoundRobinLoadBalance extends AbstractLoadBalance {
private final AtomicInteger index = new AtomicInteger(0);
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, Invocation invocation) {
return invokers.get(Math.abs(index.getAndIncrement() % invokers.size()));
}
}
// LeastActive(最少活跃数)
public class LeastActiveLoadBalance extends AbstractLoadBalance {
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, Invocation invocation) {
// 选择活跃请求数最少的 Invoker
int leastActive = Integer.MAX_VALUE;
Invoker<T> selected = null;
for (Invoker<T> invoker : invokers) {
int active = RpcStatus.getStatus(invoker.getUrl()).getActive();
if (active < leastActive) {
leastActive = active;
selected = invoker;
}
}
return selected;
}
}
3. 容错机制
// Failover(失败自动切换,默认)
@Reference(cluster = "failover", retries = 2)
private UserService userService;
// Failfast(快速失败,不重试)
@Reference(cluster = "failfast")
private UserService userService;
// Failsafe(失败安全,忽略异常)
@Reference(cluster = "failsafe")
private UserService userService;
// Failback(失败自动恢复,后台记录失败请求,定时重发)
@Reference(cluster = "failback")
private UserService userService;
// Forking(并行调用多个服务器,只要一个成功即返回)
@Reference(cluster = "forking", forks = 2)
private UserService userService;
性能优化点
1. 连接复用
// 长连接 + 连接池
@Reference(
connections = 10, // 每个 Provider 10 个连接
lazy = false // 启动时建立连接
)
private UserService userService;
2. 异步调用
// 同步调用(阻塞)
User user = userService.getUser(123L);
// 异步调用(非阻塞)
CompletableFuture<User> future = RpcContext.getContext()
.asyncCall(() -> userService.getUser(123L));
future.thenAccept(user -> {
// 异步处理结果
System.out.println(user);
});
3. 序列化选择
// 性能对比:Kryo > Hessian2 > JSON
@Service(serialization = "kryo") // 最快,但需要注册类
@Service(serialization = "hessian2") // 默认,性能与兼容性平衡
@Service(serialization = "protobuf") // 跨语言友好
答题总结
Dubbo 服务调用的完整流程:
1. 启动阶段:
- Provider 暴露服务并注册到注册中心
- Consumer 订阅服务并创建动态代理
2. Consumer 端调用链:
- Proxy:动态代理拦截方法调用
- Filter:执行过滤器链(监控、上下文、限流等)
- Cluster:负载均衡选择 Provider,容错重试
- Protocol:选择 Client 发起调用
- Exchange:构造 Request,创建 Future
- Codec:编码为字节流
- Serialize:序列化参数
- Transport:Netty 发送数据
3. Provider 端处理链:
- Transport:Netty 接收数据
- Codec:解码字节流
- Exchange:提交到业务线程池
- Filter:执行 Provider 端过滤器
- Invoker:调用真实服务方法
- 响应:编码并返回结果
4. Consumer 端接收响应:
- 解码响应,根据 RequestID 找到 Future
- 唤醒等待线程,返回结果
核心机制:
- 异步转同步:通过 DefaultFuture + RequestID 映射
- 负载均衡:Random、RoundRobin、LeastActive
- 容错机制:Failover、Failfast、Failsafe
面试技巧:建议画出调用链路图,强调异步转同步和负载均衡机制,体现对 Dubbo 底层原理的深入理解。