核心概念

事件监听机制是Nacos实现服务变更通知、配置更新推送等核心功能的基础。Nacos采用观察者模式发布订阅模式,通过NotifyCenter统一管理事件的发布和订阅。

核心组件

  • NotifyCenter:事件通知中心,统一管理事件发布和订阅
  • EventPublisher:事件发布器,负责事件的发布和分发
  • Subscriber:事件订阅者,处理特定类型的事件

事件类型体系

1. 服务注册发现相关事件

// 服务变更事件
public class ServiceChangeEvent extends Event {
    private Service service;
    private List<Instance> instances;
}

// 实例注册事件
public class InstanceRegisterEvent extends Event {
    private String serviceName;
    private Instance instance;
}

// 实例注销事件
public class InstanceDeregisterEvent extends Event {
    private String serviceName;
    private Instance instance;
}

// 实例心跳事件
public class InstanceHeartbeatEvent extends Event {
    private Instance instance;
}

// 实例健康状态变更事件
public class InstanceHealthStatusEvent extends Event {
    private Instance instance;
    private boolean healthy;
}

2. 配置管理相关事件

// 配置变更事件
public class ConfigChangeEvent extends Event {
    private String dataId;
    private String group;
    private String content;
    private ConfigType type;
}

// 配置删除事件
public class ConfigRemoveEvent extends Event {
    private String dataId;
    private String group;
}

3. 元数据相关事件

// 实例元数据变更事件
public class InstanceMetadataEvent extends Event {
    private Service service;
    private Instance instance;
    private Map<String, String> oldMetadata;
    private Map<String, String> newMetadata;
}

NotifyCenter核心设计

1. 单例模式

// NotifyCenter.java - 事件通知中心
public class NotifyCenter {
    private static final NotifyCenter INSTANCE = new NotifyCenter();
    
    // 事件发布器Map:EventType -> EventPublisher
    private final Map<Class<? extends Event>, EventPublisher> publisherMap = new ConcurrentHashMap<>();
    
    // 订阅者Map:EventType -> List<Subscriber>
    private final Map<Class<? extends Event>, List<Subscriber>> subscriberMap = new ConcurrentHashMap<>();
    
    private NotifyCenter() {
        // 初始化默认发布器
        initDefaultPublisher();
    }
    
    public static NotifyCenter getInstance() {
        return INSTANCE;
    }
}

2. 事件发布

// NotifyCenter.java - 发布事件
public static boolean publishEvent(Event event) {
    return INSTANCE.publish(event);
}

private boolean publish(Event event) {
    if (event == null) {
        return false;
    }
    
    // 获取事件类型
    Class<? extends Event> eventType = event.getClass();
    
    // 获取对应的发布器
    EventPublisher publisher = publisherMap.get(eventType);
    if (publisher == null) {
        // 使用默认发布器
        publisher = getDefaultPublisher();
    }
    
    // 发布事件
    return publisher.publish(event);
}

3. 事件订阅

// NotifyCenter.java - 订阅事件
public static <T extends Event> void registerSubscriber(Subscriber<T> subscriber) {
    INSTANCE.addSubscriber(subscriber);
}

private <T extends Event> void addSubscriber(Subscriber<T> subscriber) {
    Class<? extends Event> eventType = subscriber.subscribeType();
    
    // 添加到订阅者列表
    subscriberMap.computeIfAbsent(eventType, k -> new CopyOnWriteArrayList<>())
        .add(subscriber);
    
    // 如果事件类型已有发布器,直接关联
    EventPublisher publisher = publisherMap.get(eventType);
    if (publisher != null) {
        publisher.addSubscriber(subscriber);
    }
}

EventPublisher实现

1. DefaultPublisher(默认发布器)

// DefaultPublisher.java - 默认事件发布器
public class DefaultPublisher implements EventPublisher {
    private final BlockingQueue<Event> queue;
    private final Thread executor;
    private volatile boolean initialized = false;
    private volatile boolean shutdown = false;
    
    public DefaultPublisher() {
        // 使用有界队列,防止内存溢出
        this.queue = new ArrayBlockingQueue<>(1024);
        this.executor = new Thread(new PublisherTask(), "nacos-publisher");
    }
    
    @Override
    public void init(Class<? extends Event> type, int bufferSize) {
        this.initialized = true;
        this.executor.start();
    }
    
    @Override
    public boolean publish(Event event) {
        if (shutdown) {
            return false;
        }
        
        // 非阻塞方式添加到队列
        boolean success = queue.offer(event);
        if (!success) {
            log.warn("Event queue is full, event dropped: {}", event);
        }
        
        return success;
    }
    
    // 发布任务
    private class PublisherTask implements Runnable {
        @Override
        public void run() {
            while (!shutdown) {
                try {
                    // 从队列中取出事件
                    Event event = queue.take();
                    
                    // 获取订阅者列表
                    List<Subscriber> subscribers = getSubscribers(event.getClass());
                    
                    // 通知所有订阅者
                    for (Subscriber subscriber : subscribers) {
                        try {
                            subscriber.onEvent(event);
                        } catch (Exception e) {
                            log.error("Subscriber {} handle event {} failed", 
                                subscriber, event, e);
                        }
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                } catch (Exception e) {
                    log.error("Publisher task error", e);
                }
            }
        }
    }
}

2. SharePublisher(共享发布器)

// SharePublisher.java - 共享事件发布器(多个事件类型共享一个发布器)
public class SharePublisher implements EventPublisher {
    private final Map<Class<? extends Event>, List<Subscriber>> subscriberMap = new ConcurrentHashMap<>();
    private final BlockingQueue<Event> queue;
    private final Thread executor;
    
    public SharePublisher() {
        this.queue = new ArrayBlockingQueue<>(1024);
        this.executor = new Thread(new SharePublisherTask(), "nacos-share-publisher");
    }
    
    @Override
    public boolean publish(Event event) {
        return queue.offer(event);
    }
    
    @Override
    public void addSubscriber(Subscriber subscriber) {
        Class<? extends Event> eventType = subscriber.subscribeType();
        subscriberMap.computeIfAbsent(eventType, k -> new CopyOnWriteArrayList<>())
            .add(subscriber);
    }
    
    private class SharePublisherTask implements Runnable {
        @Override
        public void run() {
            while (!shutdown) {
                try {
                    Event event = queue.take();
                    List<Subscriber> subscribers = subscriberMap.get(event.getClass());
                    
                    if (subscribers != null) {
                        for (Subscriber subscriber : subscribers) {
                            subscriber.onEvent(event);
                        }
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
}

事件订阅者实现

1. Subscriber接口

// Subscriber.java - 事件订阅者接口
public interface Subscriber<T extends Event> {
    /**
     * 订阅的事件类型
     */
    Class<? extends Event> subscribeType();
    
    /**
     * 处理事件
     */
    void onEvent(T event);
    
    /**
     * 事件处理是否忽略过期事件
     */
    default boolean ignoreExpireEvent() {
        return false;
    }
}

2. 服务变更订阅者示例

// ServiceChangeSubscriber.java - 服务变更订阅者
@Component
public class ServiceChangeSubscriber implements Subscriber<ServiceChangeEvent> {
    @Override
    public Class<? extends Event> subscribeType() {
        return ServiceChangeEvent.class;
    }
    
    @Override
    public void onEvent(ServiceChangeEvent event) {
        Service service = event.getService();
        List<Instance> instances = event.getInstances();
        
        // 更新本地服务列表缓存
        updateLocalServiceCache(service.getName(), instances);
        
        // 通知客户端服务变更
        notifyClients(service.getName(), instances);
        
        log.info("Service {} changed, instances: {}", service.getName(), instances.size());
    }
    
    private void updateLocalServiceCache(String serviceName, List<Instance> instances) {
        // 更新本地缓存
        serviceCache.put(serviceName, instances);
    }
    
    private void notifyClients(String serviceName, List<Instance> instances) {
        // 获取订阅该服务的客户端
        Set<String> subscribers = getSubscribers(serviceName);
        
        for (String clientId : subscribers) {
            // 通过UDP推送服务变更
            pushServiceInfo(clientId, serviceName, instances);
        }
    }
}

3. 配置变更订阅者示例

// ConfigChangeSubscriber.java - 配置变更订阅者
@Component
public class ConfigChangeSubscriber implements Subscriber<ConfigChangeEvent> {
    @Override
    public Class<? extends Event> subscribeType() {
        return ConfigChangeEvent.class;
    }
    
    @Override
    public void onEvent(ConfigChangeEvent event) {
        String dataId = event.getDataId();
        String group = event.getGroup();
        String content = event.getContent();
        
        // 更新配置缓存
        configCache.put(buildKey(dataId, group), content);
        
        // 通知订阅该配置的客户端
        notifyConfigSubscribers(dataId, group, content);
        
        log.info("Config changed: {}/{}", group, dataId);
    }
    
    private void notifyConfigSubscribers(String dataId, String group, String content) {
        // 获取订阅该配置的客户端
        Set<String> subscribers = getConfigSubscribers(dataId, group);
        
        for (String clientId : subscribers) {
            // 通过长轮询推送配置变更
            pushConfigChange(clientId, dataId, group, content);
        }
    }
}

事件发布流程

1. 服务注册事件发布

// InstanceController.java - 服务注册时发布事件
@PostMapping("/instance")
public String register(HttpServletRequest request) {
    // 1. 注册实例
    Instance instance = parseInstance(request);
    serviceManager.registerInstance(namespaceId, serviceName, instance);
    
    // 2. 发布实例注册事件
    NotifyCenter.publishEvent(new InstanceRegisterEvent(serviceName, instance));
    
    // 3. 发布服务变更事件
    Service service = serviceManager.getService(namespaceId, serviceName);
    NotifyCenter.publishEvent(new ServiceChangeEvent(service));
    
    return "ok";
}

2. 服务注销事件发布

// InstanceController.java - 服务注销时发布事件
@DeleteMapping("/instance")
public String deregister(HttpServletRequest request) {
    Instance instance = parseInstance(request);
    
    // 1. 注销实例
    serviceManager.removeInstance(namespaceId, serviceName, instance);
    
    // 2. 发布实例注销事件
    NotifyCenter.publishEvent(new InstanceDeregisterEvent(serviceName, instance));
    
    // 3. 发布服务变更事件
    Service service = serviceManager.getService(namespaceId, serviceName);
    NotifyCenter.publishEvent(new ServiceChangeEvent(service));
    
    return "ok";
}

3. 心跳事件发布

// InstanceController.java - 心跳时发布事件
@PutMapping("/beat")
public JsonNode beat(HttpServletRequest request) {
    BeatInfo beatInfo = parseBeatInfo(request);
    Instance instance = serviceManager.getInstance(beatInfo);
    
    // 更新心跳时间
    instance.setLastBeat(System.currentTimeMillis());
    
    // 发布心跳事件
    NotifyCenter.publishEvent(new InstanceHeartbeatEvent(instance));
    
    return buildBeatResponse(instance);
}

事件监听的应用场景

1. 服务变更通知客户端

// ServiceChangeNotifier.java - 服务变更通知器
@Component
public class ServiceChangeNotifier implements Subscriber<ServiceChangeEvent> {
    @Override
    public Class<? extends Event> subscribeType() {
        return ServiceChangeEvent.class;
    }
    
    @Override
    public void onEvent(ServiceChangeEvent event) {
        Service service = event.getService();
        
        // 获取订阅该服务的客户端
        Set<PushClient> clients = getSubscribedClients(service);
        
        // 通过UDP推送服务变更
        for (PushClient client : clients) {
            try {
                pushServiceInfo(client, service);
            } catch (Exception e) {
                log.error("Push service info to client {} failed", client, e);
            }
        }
    }
}

2. 配置变更推送

// ConfigChangeNotifier.java - 配置变更通知器
@Component
public class ConfigChangeNotifier implements Subscriber<ConfigChangeEvent> {
    @Override
    public Class<? extends Event> subscribeType() {
        return ConfigChangeEvent.class;
    }
    
    @Override
    public void onEvent(ConfigChangeEvent event) {
        String dataId = event.getDataId();
        String group = event.getGroup();
        
        // 获取订阅该配置的客户端
        Set<LongPollingClient> clients = getConfigSubscribers(dataId, group);
        
        // 通过长轮询推送配置变更
        for (LongPollingClient client : clients) {
            try {
                client.sendResponse(dataId, group, event.getContent());
            } catch (Exception e) {
                log.error("Push config to client {} failed", client, e);
            }
        }
    }
}

3. 监控指标收集

// MetricsCollector.java - 监控指标收集器
@Component
public class MetricsCollector implements Subscriber<InstanceHeartbeatEvent> {
    private final MeterRegistry meterRegistry;
    
    @Override
    public Class<? extends Event> subscribeType() {
        return InstanceHeartbeatEvent.class;
    }
    
    @Override
    public void onEvent(InstanceHeartbeatEvent event) {
        Instance instance = event.getInstance();
        
        // 记录心跳指标
        meterRegistry.counter("nacos.heartbeat.total",
            "service", instance.getServiceName(),
            "ip", instance.getIp()
        ).increment();
    }
}

性能优化

1. 异步事件处理

// 事件发布采用异步方式,不阻塞主流程
public boolean publish(Event event) {
    // 非阻塞方式添加到队列
    return queue.offer(event);
}

2. 事件去重

// 相同事件去重,避免重复处理
public class EventDeduplicator {
    private final Set<String> processedEvents = new ConcurrentHashMap<>().newKeySet();
    
    public boolean shouldProcess(Event event) {
        String key = buildEventKey(event);
        
        // 如果已处理,跳过
        if (processedEvents.contains(key)) {
            return false;
        }
        
        // 标记为已处理
        processedEvents.add(key);
        
        // 定期清理过期事件
        cleanupExpiredEvents();
        
        return true;
    }
}

3. 订阅者优先级

// 支持订阅者优先级,重要订阅者优先处理
public interface Subscriber<T extends Event> {
    /**
     * 订阅者优先级(数字越小优先级越高)
     */
    default int priority() {
        return 0;
    }
}

// 按优先级排序订阅者
List<Subscriber> subscribers = getSubscribers(eventType);
subscribers.sort(Comparator.comparingInt(Subscriber::priority));

for (Subscriber subscriber : subscribers) {
    subscriber.onEvent(event);
}

分布式场景考量

1. 集群事件同步

// 集群环境下,事件需要同步到其他节点
public class ClusterEventPublisher implements EventPublisher {
    @Override
    public boolean publish(Event event) {
        // 1. 本地发布
        localPublish(event);
        
        // 2. 同步到其他节点
        if (event instanceof ClusterEvent) {
            syncToOtherNodes(event);
        }
        
        return true;
    }
    
    private void syncToOtherNodes(Event event) {
        for (Member member : clusterMembers) {
            if (!member.isSelf()) {
                distroProtocol.sync(event, member);
            }
        }
    }
}

2. 事件顺序保证

// 对于需要保证顺序的事件,使用单线程处理
public class OrderedEventPublisher implements EventPublisher {
    private final BlockingQueue<Event> queue = new LinkedBlockingQueue<>();
    private final Thread executor = new Thread(new OrderedPublisherTask());
    
    private class OrderedPublisherTask implements Runnable {
        @Override
        public void run() {
            while (!shutdown) {
                try {
                    Event event = queue.take();
                    // 单线程顺序处理
                    processEvent(event);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
}

实战示例

1. 自定义事件订阅

// 自定义服务变更监听器
@Component
public class CustomServiceChangeListener implements Subscriber<ServiceChangeEvent> {
    @Override
    public Class<? extends Event> subscribeType() {
        return ServiceChangeEvent.class;
    }
    
    @Override
    public void onEvent(ServiceChangeEvent event) {
        Service service = event.getService();
        List<Instance> instances = event.getInstances();
        
        // 自定义处理逻辑
        log.info("Service {} changed, total instances: {}", 
            service.getName(), instances.size());
        
        // 更新负载均衡器
        updateLoadBalancer(service.getName(), instances);
        
        // 发送告警(如果实例数变化较大)
        checkAndAlert(service, instances);
    }
    
    @PostConstruct
    public void init() {
        // 注册订阅者
        NotifyCenter.registerSubscriber(this);
    }
}

2. 配置变更监听

// 配置变更监听器
@Component
public class ConfigChangeListener implements Subscriber<ConfigChangeEvent> {
    @Override
    public Class<? extends Event> subscribeType() {
        return ConfigChangeEvent.class;
    }
    
    @Override
    public void onEvent(ConfigChangeEvent event) {
        String dataId = event.getDataId();
        String group = event.getGroup();
        String content = event.getContent();
        
        // 重新加载配置
        reloadConfig(dataId, group, content);
        
        // 通知相关组件
        notifyComponents(dataId, group);
    }
    
    @PostConstruct
    public void init() {
        NotifyCenter.registerSubscriber(this);
    }
}

面试总结

Nacos事件监听机制核心要点

  1. 核心组件
    • NotifyCenter:事件通知中心,单例模式
    • EventPublisher:事件发布器,异步处理
    • Subscriber:事件订阅者,处理特定事件
  2. 事件类型
    • 服务注册发现:ServiceChangeEvent、InstanceRegisterEvent等
    • 配置管理:ConfigChangeEvent、ConfigRemoveEvent
    • 元数据:InstanceMetadataEvent
  3. 发布订阅模式
    • 发布:NotifyCenter.publishEvent(event)
    • 订阅:NotifyCenter.registerSubscriber(subscriber)
    • 异步处理,不阻塞主流程
  4. 性能优化
    • 异步事件处理
    • 事件去重
    • 订阅者优先级
  5. 应用场景
    • 服务变更通知客户端
    • 配置变更推送
    • 监控指标收集

技术亮点

  • 统一的NotifyCenter管理所有事件
  • 支持多种事件发布器(DefaultPublisher、SharePublisher)
  • 异步处理,高性能
  • 完善的订阅者管理机制

设计模式

  • 观察者模式:事件发布和订阅
  • 单例模式:NotifyCenter单例
  • 策略模式:不同的事件发布器策略