核心概念

健康检查是服务注册中心的核心功能,用于检测服务实例的可用性,及时剔除不可用的实例。Nacos根据实例类型(临时实例/持久化实例)采用不同的健康检查机制。

两种健康检查模式

  • 临时实例(AP模式):客户端主动心跳上报
  • 持久化实例(CP模式):服务端主动健康探测

临时实例健康检查(客户端心跳)

1. 心跳机制

// 客户端配置
spring:
  cloud:
    nacos:
      discovery:
        ephemeral: true  # 临时实例
        heart-beat-interval: 5000      # 心跳间隔5秒
        heart-beat-timeout: 15000      # 心跳超时15秒
        ip-delete-timeout: 30000       # 实例删除超时30秒

心跳流程

  1. 客户端启动后,每5秒发送一次心跳到Nacos Server
  2. Server收到心跳后,更新实例的最后心跳时间
  3. 如果15秒未收到心跳,标记实例为不健康
  4. 如果30秒未收到心跳,从注册表中删除实例

2. 客户端心跳实现

// NacosNamingService.java - 心跳任务
public class BeatReactor {
    private final ScheduledExecutorService executorService;
    private final Map<String, BeatInfo> beatInfoMap = new ConcurrentHashMap<>();
    
    // 添加心跳任务
    public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
        String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
        beatInfoMap.put(key, beatInfo);
        
        // 启动定时任务
        executorService.scheduleAtFixedRate(
            new BeatTask(beatInfo),
            0,
            beatInfo.getPeriod(),  // 默认5秒
            TimeUnit.MILLISECONDS
        );
    }
    
    // 心跳任务
    private class BeatTask implements Runnable {
        private final BeatInfo beatInfo;
        
        @Override
        public void run() {
            try {
                // 发送心跳请求
                String result = serverProxy.sendBeat(beatInfo);
                
                // 解析响应,更新心跳间隔
                if (StringUtils.isNotBlank(result)) {
                    BeatInfo serverBeatInfo = JSON.parseObject(result, BeatInfo.class);
                    if (serverBeatInfo.getPeriod() > 0) {
                        beatInfo.setPeriod(serverBeatInfo.getPeriod());
                    }
                }
            } catch (Exception e) {
                // 心跳失败,记录日志
                log.error("Send beat failed", e);
            }
        }
    }
}

// 心跳请求
public String sendBeat(BeatInfo beatInfo) {
    Map<String, String> params = new HashMap<>();
    params.put("serviceName", beatInfo.getServiceName());
    params.put("beat", JSON.toJSONString(beatInfo));
    
    // POST /nacos/v1/ns/instance/beat
    return reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, HttpMethod.POST);
}

3. 服务端心跳处理

// InstanceController.java - 心跳接口
@PutMapping("/beat")
public JsonNode beat(HttpServletRequest request) {
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    String beat = WebUtils.required(request, "beat");
    
    // 解析心跳信息
    BeatInfo beatInfo = JSON.parseObject(beat, BeatInfo.class);
    
    // 更新实例心跳时间
    Instance instance = serviceManager.getInstance(
        beatInfo.getNamespaceId(),
        serviceName,
        beatInfo.getIp(),
        beatInfo.getPort()
    );
    
    if (instance != null) {
        // 更新最后心跳时间
        instance.setLastBeat(System.currentTimeMillis());
        instance.setHealthy(true);
        
        // 触发服务变更事件
        NotifyCenter.publishEvent(new InstanceHeartbeatEvent(instance));
    }
    
    // 返回心跳间隔(服务端可动态调整)
    BeatInfo result = new BeatInfo();
    result.setPeriod(instance.getHeartBeatInterval());
    return JSON.parseObject(JSON.toJSONString(result));
}

4. 健康检查任务

// ServiceManager.java - 健康检查任务
public class ServiceManager {
    private final ScheduledExecutorService healthCheckExecutor;
    
    @PostConstruct
    public void init() {
        // 启动健康检查任务,每5秒执行一次
        healthCheckExecutor.scheduleAtFixedRate(
            this::healthCheck,
            0,
            5,
            TimeUnit.SECONDS
        );
    }
    
    private void healthCheck() {
        // 遍历所有临时实例
        for (Service service : getAllServices()) {
            for (Cluster cluster : service.getClusterMap().values()) {
                Set<Instance> instances = cluster.getEphemeralInstances();
                
                for (Instance instance : instances) {
                    long currentTime = System.currentTimeMillis();
                    long lastBeat = instance.getLastBeat();
                    
                    // 15秒未心跳,标记不健康
                    if (currentTime - lastBeat > instance.getHeartBeatTimeout()) {
                        instance.setHealthy(false);
                        log.warn("Instance {} is unhealthy", instance);
                    }
                    
                    // 30秒未心跳,删除实例
                    if (currentTime - lastBeat > instance.getIpDeleteTimeout()) {
                        removeInstance(service, instance);
                        log.warn("Instance {} is removed", instance);
                    }
                }
            }
        }
    }
}

持久化实例健康检查(服务端主动探测)

1. 主动探测机制

// 持久化实例配置
spring:
  cloud:
    nacos:
      discovery:
        ephemeral: false  # 持久化实例
        metadata:
          preserved.heart.beat.interval: 5000
          preserved.heart.beat.timeout: 15000
          preserved.ip.delete.timeout: 30000
          # 健康检查类型
          preserved.health.check.type: TCP  # TCP/HTTP/MySQL
          preserved.health.check.url: http://192.168.1.10:8080/health
          preserved.health.check.port: 8080

支持的健康检查类型

  • TCP:TCP连接检查
  • HTTP:HTTP请求检查
  • MySQL:数据库连接检查

2. TCP健康检查

// HealthCheckProcessor.java - TCP检查
public class TcpHealthCheckProcessor implements HealthCheckProcessor {
    @Override
    public boolean processHealthCheck(Instance instance) {
        String ip = instance.getIp();
        int port = instance.getPort();
        
        try (Socket socket = new Socket()) {
            // 设置连接超时
            socket.connect(new InetSocketAddress(ip, port), 3000);
            return socket.isConnected();
        } catch (IOException e) {
            log.warn("TCP health check failed for {}:{}", ip, port, e);
            return false;
        }
    }
}

3. HTTP健康检查

// HttpHealthCheckProcessor.java - HTTP检查
public class HttpHealthCheckProcessor implements HealthCheckProcessor {
    private final RestTemplate restTemplate;
    
    @Override
    public boolean processHealthCheck(Instance instance) {
        String healthCheckUrl = instance.getMetadata().get("preserved.health.check.url");
        if (StringUtils.isBlank(healthCheckUrl)) {
            // 默认使用 /actuator/health
            healthCheckUrl = "http://" + instance.getIp() + ":" + instance.getPort() + "/actuator/health";
        }
        
        try {
            ResponseEntity<String> response = restTemplate.getForEntity(
                healthCheckUrl,
                String.class
            );
            
            // 2xx状态码认为健康
            return response.getStatusCode().is2xxSuccessful();
        } catch (Exception e) {
            log.warn("HTTP health check failed for {}", healthCheckUrl, e);
            return false;
        }
    }
}

4. MySQL健康检查

// MysqlHealthCheckProcessor.java - MySQL检查
public class MysqlHealthCheckProcessor implements HealthCheckProcessor {
    @Override
    public boolean processHealthCheck(Instance instance) {
        String jdbcUrl = instance.getMetadata().get("preserved.health.check.mysql.url");
        String username = instance.getMetadata().get("preserved.health.check.mysql.user");
        String password = instance.getMetadata().get("preserved.health.check.mysql.password");
        
        try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) {
            // 执行简单查询
            try (Statement stmt = conn.createStatement()) {
                stmt.executeQuery("SELECT 1");
            }
            return true;
        } catch (SQLException e) {
            log.warn("MySQL health check failed", e);
            return false;
        }
    }
}

5. 健康检查调度

// HealthCheckTask.java - 健康检查任务
public class HealthCheckTask implements Runnable {
    private final Instance instance;
    private final HealthCheckProcessor processor;
    
    @Override
    public void run() {
        try {
            // 执行健康检查
            boolean healthy = processor.processHealthCheck(instance);
            
            // 更新健康状态
            if (healthy != instance.isHealthy()) {
                instance.setHealthy(healthy);
                
                // 触发健康状态变更事件
                NotifyCenter.publishEvent(new InstanceHealthStatusEvent(instance));
            }
            
            // 注意:持久化实例不健康时不会删除,需要手动下线
        } catch (Exception e) {
            log.error("Health check error", e);
        }
    }
}

// HealthCheckReactor.java - 健康检查调度器
public class HealthCheckReactor {
    private final ScheduledExecutorService executorService;
    private final Map<String, ScheduledFuture<?>> healthCheckTasks = new ConcurrentHashMap<>();
    
    public void scheduleCheck(Instance instance) {
        String key = buildKey(instance);
        
        // 获取健康检查类型
        String checkType = instance.getMetadata().get("preserved.health.check.type");
        HealthCheckProcessor processor = getProcessor(checkType);
        
        // 获取检查间隔
        long interval = getCheckInterval(instance);
        
        // 启动定时检查任务
        ScheduledFuture<?> future = executorService.scheduleAtFixedRate(
            new HealthCheckTask(instance, processor),
            0,
            interval,
            TimeUnit.MILLISECONDS
        );
        
        healthCheckTasks.put(key, future);
    }
}

健康检查状态流转

1. 临时实例状态流转

注册成功 → UP (健康)
    ↓
心跳正常 → UP (健康)
    ↓
15秒未心跳 → DOWN (不健康,但仍在注册表)
    ↓
30秒未心跳 → 从注册表删除

2. 持久化实例状态流转

注册成功 → UP (健康)
    ↓
健康检查通过 → UP (健康)
    ↓
健康检查失败 → DOWN (不健康,但仍在注册表)
    ↓
手动下线 → 从注册表删除

关键区别

  • 临时实例:不健康超过30秒自动删除
  • 持久化实例:不健康时不会自动删除,需要手动下线

性能优化与线程安全

1. 异步健康检查

// 健康检查采用异步方式,不阻塞主线程
public class HealthCheckReactor {
    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
        10,  // 核心线程数
        50,  // 最大线程数
        60L, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(1000),
        new ThreadFactoryBuilder().setNameFormat("health-check-%d").build()
    );
    
    public void checkAsync(Instance instance) {
        executor.submit(() -> {
            processHealthCheck(instance);
        });
    }
}

2. 健康检查限流

// 防止健康检查请求过多
public class HealthCheckReactor {
    private final RateLimiter rateLimiter = RateLimiter.create(100);  // 每秒100个请求
    
    public void checkWithRateLimit(Instance instance) {
        if (rateLimiter.tryAcquire()) {
            processHealthCheck(instance);
        } else {
            log.warn("Health check rate limit exceeded");
        }
    }
}

3. 批量健康检查

// 批量检查,减少网络开销
public class BatchHealthCheckProcessor {
    public void batchCheck(List<Instance> instances) {
        // 按IP分组
        Map<String, List<Instance>> grouped = instances.stream()
            .collect(Collectors.groupingBy(Instance::getIp));
        
        // 批量检查同一IP的不同端口
        for (Map.Entry<String, List<Instance>> entry : grouped.entrySet()) {
            String ip = entry.getKey();
            List<Instance> ipInstances = entry.getValue();
            
            // 复用连接,批量检查
            batchCheckForIp(ip, ipInstances);
        }
    }
}

分布式场景考量

1. 健康检查的分布式一致性

临时实例(AP模式)

  • 各节点独立进行健康检查
  • 心跳信息通过Distro协议异步同步
  • 可能出现短暂的不一致(某个节点已删除,其他节点还未删除)

持久化实例(CP模式)

  • 健康检查结果通过Raft协议同步
  • 保证所有节点健康状态一致
  • 但检查任务可能在不同节点执行

2. 健康检查的容错

// 健康检查失败重试
public class HealthCheckTask {
    private static final int MAX_RETRY = 3;
    
    @Override
    public void run() {
        int retry = 0;
        boolean healthy = false;
        
        while (retry < MAX_RETRY && !healthy) {
            try {
                healthy = processor.processHealthCheck(instance);
            } catch (Exception e) {
                retry++;
                if (retry < MAX_RETRY) {
                    // 指数退避
                    Thread.sleep(1000 * (1 << retry));
                }
            }
        }
        
        // 更新健康状态
        instance.setHealthy(healthy);
    }
}

3. 健康检查的监控

// 健康检查指标监控
public class HealthCheckMetrics {
    private final MeterRegistry meterRegistry;
    
    public void recordHealthCheck(Instance instance, boolean healthy, long duration) {
        // 记录检查次数
        meterRegistry.counter("nacos.health.check.total",
            "service", instance.getServiceName(),
            "type", instance.isEphemeral() ? "ephemeral" : "persistent"
        ).increment();
        
        // 记录检查耗时
        meterRegistry.timer("nacos.health.check.duration",
            "service", instance.getServiceName()
        ).record(duration, TimeUnit.MILLISECONDS);
        
        // 记录健康状态
        meterRegistry.gauge("nacos.health.check.status",
            Tags.of("service", instance.getServiceName(), "healthy", String.valueOf(healthy)),
            healthy ? 1 : 0
        );
    }
}

实战示例

1. 临时实例健康检查配置

spring:
  cloud:
    nacos:
      discovery:
        ephemeral: true
        heart-beat-interval: 5000      # 心跳间隔5秒
        heart-beat-timeout: 15000      # 15秒未心跳标记不健康
        ip-delete-timeout: 30000       # 30秒未心跳删除实例

2. 持久化实例健康检查配置

spring:
  cloud:
    nacos:
      discovery:
        ephemeral: false
        metadata:
          # TCP健康检查
          preserved.health.check.type: TCP
          preserved.health.check.port: 8080
          
          # 或HTTP健康检查
          # preserved.health.check.type: HTTP
          # preserved.health.check.url: http://${spring.cloud.client.ip-address}:${server.port}/actuator/health
          
          # 或MySQL健康检查
          # preserved.health.check.type: MySQL
          # preserved.health.check.mysql.url: jdbc:mysql://localhost:3306/test
          # preserved.health.check.mysql.user: root
          # preserved.health.check.mysql.password: password

3. 自定义健康检查端点

@RestController
public class HealthController {
    @GetMapping("/actuator/health")
    public Map<String, Object> health() {
        Map<String, Object> result = new HashMap<>();
        
        // 检查数据库连接
        boolean dbHealthy = checkDatabase();
        
        // 检查Redis连接
        boolean redisHealthy = checkRedis();
        
        // 综合健康状态
        boolean healthy = dbHealthy && redisHealthy;
        
        result.put("status", healthy ? "UP" : "DOWN");
        result.put("db", dbHealthy ? "UP" : "DOWN");
        result.put("redis", redisHealthy ? "UP" : "DOWN");
        
        return result;
    }
    
    private boolean checkDatabase() {
        try {
            // 执行简单查询
            jdbcTemplate.queryForObject("SELECT 1", Integer.class);
            return true;
        } catch (Exception e) {
            return false;
        }
    }
    
    private boolean checkRedis() {
        try {
            redisTemplate.hasKey("health-check");
            return true;
        } catch (Exception e) {
            return false;
        }
    }
}

面试总结

Nacos健康检查机制核心要点

  1. 双模式健康检查
    • 临时实例:客户端心跳(5秒间隔),15秒未心跳不健康,30秒未心跳删除
    • 持久化实例:服务端主动探测(TCP/HTTP/MySQL),不健康不删除
  2. 心跳机制
    • 客户端定时发送心跳(默认5秒)
    • 服务端更新最后心跳时间
    • 健康检查任务定期扫描,超时标记不健康或删除
  3. 主动探测
    • 支持TCP、HTTP、MySQL三种方式
    • 服务端定时执行健康检查
    • 检查结果通过Raft协议同步(CP模式)
  4. 性能优化
    • 异步健康检查,不阻塞主线程
    • 限流保护,防止请求过多
    • 批量检查,减少网络开销
  5. 容错设计
    • 健康检查失败重试
    • 指数退避策略
    • 监控指标收集

技术亮点

  • 临时实例和持久化实例采用不同的健康检查策略
  • 支持多种健康检查方式(TCP/HTTP/MySQL)
  • 异步+限流+批量,保证高性能
  • 完善的监控和容错机制