核心概念
事件监听机制是Nacos实现服务变更通知、配置更新推送等核心功能的基础。Nacos采用观察者模式和发布订阅模式,通过NotifyCenter统一管理事件的发布和订阅。
核心组件:
- NotifyCenter:事件通知中心,统一管理事件发布和订阅
- EventPublisher:事件发布器,负责事件的发布和分发
- Subscriber:事件订阅者,处理特定类型的事件
事件类型体系
1. 服务注册发现相关事件
// 服务变更事件
public class ServiceChangeEvent extends Event {
private Service service;
private List<Instance> instances;
}
// 实例注册事件
public class InstanceRegisterEvent extends Event {
private String serviceName;
private Instance instance;
}
// 实例注销事件
public class InstanceDeregisterEvent extends Event {
private String serviceName;
private Instance instance;
}
// 实例心跳事件
public class InstanceHeartbeatEvent extends Event {
private Instance instance;
}
// 实例健康状态变更事件
public class InstanceHealthStatusEvent extends Event {
private Instance instance;
private boolean healthy;
}
2. 配置管理相关事件
// 配置变更事件
public class ConfigChangeEvent extends Event {
private String dataId;
private String group;
private String content;
private ConfigType type;
}
// 配置删除事件
public class ConfigRemoveEvent extends Event {
private String dataId;
private String group;
}
3. 元数据相关事件
// 实例元数据变更事件
public class InstanceMetadataEvent extends Event {
private Service service;
private Instance instance;
private Map<String, String> oldMetadata;
private Map<String, String> newMetadata;
}
NotifyCenter核心设计
1. 单例模式
// NotifyCenter.java - 事件通知中心
public class NotifyCenter {
private static final NotifyCenter INSTANCE = new NotifyCenter();
// 事件发布器Map:EventType -> EventPublisher
private final Map<Class<? extends Event>, EventPublisher> publisherMap = new ConcurrentHashMap<>();
// 订阅者Map:EventType -> List<Subscriber>
private final Map<Class<? extends Event>, List<Subscriber>> subscriberMap = new ConcurrentHashMap<>();
private NotifyCenter() {
// 初始化默认发布器
initDefaultPublisher();
}
public static NotifyCenter getInstance() {
return INSTANCE;
}
}
2. 事件发布
// NotifyCenter.java - 发布事件
public static boolean publishEvent(Event event) {
return INSTANCE.publish(event);
}
private boolean publish(Event event) {
if (event == null) {
return false;
}
// 获取事件类型
Class<? extends Event> eventType = event.getClass();
// 获取对应的发布器
EventPublisher publisher = publisherMap.get(eventType);
if (publisher == null) {
// 使用默认发布器
publisher = getDefaultPublisher();
}
// 发布事件
return publisher.publish(event);
}
3. 事件订阅
// NotifyCenter.java - 订阅事件
public static <T extends Event> void registerSubscriber(Subscriber<T> subscriber) {
INSTANCE.addSubscriber(subscriber);
}
private <T extends Event> void addSubscriber(Subscriber<T> subscriber) {
Class<? extends Event> eventType = subscriber.subscribeType();
// 添加到订阅者列表
subscriberMap.computeIfAbsent(eventType, k -> new CopyOnWriteArrayList<>())
.add(subscriber);
// 如果事件类型已有发布器,直接关联
EventPublisher publisher = publisherMap.get(eventType);
if (publisher != null) {
publisher.addSubscriber(subscriber);
}
}
EventPublisher实现
1. DefaultPublisher(默认发布器)
// DefaultPublisher.java - 默认事件发布器
public class DefaultPublisher implements EventPublisher {
private final BlockingQueue<Event> queue;
private final Thread executor;
private volatile boolean initialized = false;
private volatile boolean shutdown = false;
public DefaultPublisher() {
// 使用有界队列,防止内存溢出
this.queue = new ArrayBlockingQueue<>(1024);
this.executor = new Thread(new PublisherTask(), "nacos-publisher");
}
@Override
public void init(Class<? extends Event> type, int bufferSize) {
this.initialized = true;
this.executor.start();
}
@Override
public boolean publish(Event event) {
if (shutdown) {
return false;
}
// 非阻塞方式添加到队列
boolean success = queue.offer(event);
if (!success) {
log.warn("Event queue is full, event dropped: {}", event);
}
return success;
}
// 发布任务
private class PublisherTask implements Runnable {
@Override
public void run() {
while (!shutdown) {
try {
// 从队列中取出事件
Event event = queue.take();
// 获取订阅者列表
List<Subscriber> subscribers = getSubscribers(event.getClass());
// 通知所有订阅者
for (Subscriber subscriber : subscribers) {
try {
subscriber.onEvent(event);
} catch (Exception e) {
log.error("Subscriber {} handle event {} failed",
subscriber, event, e);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("Publisher task error", e);
}
}
}
}
}
2. SharePublisher(共享发布器)
// SharePublisher.java - 共享事件发布器(多个事件类型共享一个发布器)
public class SharePublisher implements EventPublisher {
private final Map<Class<? extends Event>, List<Subscriber>> subscriberMap = new ConcurrentHashMap<>();
private final BlockingQueue<Event> queue;
private final Thread executor;
public SharePublisher() {
this.queue = new ArrayBlockingQueue<>(1024);
this.executor = new Thread(new SharePublisherTask(), "nacos-share-publisher");
}
@Override
public boolean publish(Event event) {
return queue.offer(event);
}
@Override
public void addSubscriber(Subscriber subscriber) {
Class<? extends Event> eventType = subscriber.subscribeType();
subscriberMap.computeIfAbsent(eventType, k -> new CopyOnWriteArrayList<>())
.add(subscriber);
}
private class SharePublisherTask implements Runnable {
@Override
public void run() {
while (!shutdown) {
try {
Event event = queue.take();
List<Subscriber> subscribers = subscriberMap.get(event.getClass());
if (subscribers != null) {
for (Subscriber subscriber : subscribers) {
subscriber.onEvent(event);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
事件订阅者实现
1. Subscriber接口
// Subscriber.java - 事件订阅者接口
public interface Subscriber<T extends Event> {
/**
* 订阅的事件类型
*/
Class<? extends Event> subscribeType();
/**
* 处理事件
*/
void onEvent(T event);
/**
* 事件处理是否忽略过期事件
*/
default boolean ignoreExpireEvent() {
return false;
}
}
2. 服务变更订阅者示例
// ServiceChangeSubscriber.java - 服务变更订阅者
@Component
public class ServiceChangeSubscriber implements Subscriber<ServiceChangeEvent> {
@Override
public Class<? extends Event> subscribeType() {
return ServiceChangeEvent.class;
}
@Override
public void onEvent(ServiceChangeEvent event) {
Service service = event.getService();
List<Instance> instances = event.getInstances();
// 更新本地服务列表缓存
updateLocalServiceCache(service.getName(), instances);
// 通知客户端服务变更
notifyClients(service.getName(), instances);
log.info("Service {} changed, instances: {}", service.getName(), instances.size());
}
private void updateLocalServiceCache(String serviceName, List<Instance> instances) {
// 更新本地缓存
serviceCache.put(serviceName, instances);
}
private void notifyClients(String serviceName, List<Instance> instances) {
// 获取订阅该服务的客户端
Set<String> subscribers = getSubscribers(serviceName);
for (String clientId : subscribers) {
// 通过UDP推送服务变更
pushServiceInfo(clientId, serviceName, instances);
}
}
}
3. 配置变更订阅者示例
// ConfigChangeSubscriber.java - 配置变更订阅者
@Component
public class ConfigChangeSubscriber implements Subscriber<ConfigChangeEvent> {
@Override
public Class<? extends Event> subscribeType() {
return ConfigChangeEvent.class;
}
@Override
public void onEvent(ConfigChangeEvent event) {
String dataId = event.getDataId();
String group = event.getGroup();
String content = event.getContent();
// 更新配置缓存
configCache.put(buildKey(dataId, group), content);
// 通知订阅该配置的客户端
notifyConfigSubscribers(dataId, group, content);
log.info("Config changed: {}/{}", group, dataId);
}
private void notifyConfigSubscribers(String dataId, String group, String content) {
// 获取订阅该配置的客户端
Set<String> subscribers = getConfigSubscribers(dataId, group);
for (String clientId : subscribers) {
// 通过长轮询推送配置变更
pushConfigChange(clientId, dataId, group, content);
}
}
}
事件发布流程
1. 服务注册事件发布
// InstanceController.java - 服务注册时发布事件
@PostMapping("/instance")
public String register(HttpServletRequest request) {
// 1. 注册实例
Instance instance = parseInstance(request);
serviceManager.registerInstance(namespaceId, serviceName, instance);
// 2. 发布实例注册事件
NotifyCenter.publishEvent(new InstanceRegisterEvent(serviceName, instance));
// 3. 发布服务变更事件
Service service = serviceManager.getService(namespaceId, serviceName);
NotifyCenter.publishEvent(new ServiceChangeEvent(service));
return "ok";
}
2. 服务注销事件发布
// InstanceController.java - 服务注销时发布事件
@DeleteMapping("/instance")
public String deregister(HttpServletRequest request) {
Instance instance = parseInstance(request);
// 1. 注销实例
serviceManager.removeInstance(namespaceId, serviceName, instance);
// 2. 发布实例注销事件
NotifyCenter.publishEvent(new InstanceDeregisterEvent(serviceName, instance));
// 3. 发布服务变更事件
Service service = serviceManager.getService(namespaceId, serviceName);
NotifyCenter.publishEvent(new ServiceChangeEvent(service));
return "ok";
}
3. 心跳事件发布
// InstanceController.java - 心跳时发布事件
@PutMapping("/beat")
public JsonNode beat(HttpServletRequest request) {
BeatInfo beatInfo = parseBeatInfo(request);
Instance instance = serviceManager.getInstance(beatInfo);
// 更新心跳时间
instance.setLastBeat(System.currentTimeMillis());
// 发布心跳事件
NotifyCenter.publishEvent(new InstanceHeartbeatEvent(instance));
return buildBeatResponse(instance);
}
事件监听的应用场景
1. 服务变更通知客户端
// ServiceChangeNotifier.java - 服务变更通知器
@Component
public class ServiceChangeNotifier implements Subscriber<ServiceChangeEvent> {
@Override
public Class<? extends Event> subscribeType() {
return ServiceChangeEvent.class;
}
@Override
public void onEvent(ServiceChangeEvent event) {
Service service = event.getService();
// 获取订阅该服务的客户端
Set<PushClient> clients = getSubscribedClients(service);
// 通过UDP推送服务变更
for (PushClient client : clients) {
try {
pushServiceInfo(client, service);
} catch (Exception e) {
log.error("Push service info to client {} failed", client, e);
}
}
}
}
2. 配置变更推送
// ConfigChangeNotifier.java - 配置变更通知器
@Component
public class ConfigChangeNotifier implements Subscriber<ConfigChangeEvent> {
@Override
public Class<? extends Event> subscribeType() {
return ConfigChangeEvent.class;
}
@Override
public void onEvent(ConfigChangeEvent event) {
String dataId = event.getDataId();
String group = event.getGroup();
// 获取订阅该配置的客户端
Set<LongPollingClient> clients = getConfigSubscribers(dataId, group);
// 通过长轮询推送配置变更
for (LongPollingClient client : clients) {
try {
client.sendResponse(dataId, group, event.getContent());
} catch (Exception e) {
log.error("Push config to client {} failed", client, e);
}
}
}
}
3. 监控指标收集
// MetricsCollector.java - 监控指标收集器
@Component
public class MetricsCollector implements Subscriber<InstanceHeartbeatEvent> {
private final MeterRegistry meterRegistry;
@Override
public Class<? extends Event> subscribeType() {
return InstanceHeartbeatEvent.class;
}
@Override
public void onEvent(InstanceHeartbeatEvent event) {
Instance instance = event.getInstance();
// 记录心跳指标
meterRegistry.counter("nacos.heartbeat.total",
"service", instance.getServiceName(),
"ip", instance.getIp()
).increment();
}
}
性能优化
1. 异步事件处理
// 事件发布采用异步方式,不阻塞主流程
public boolean publish(Event event) {
// 非阻塞方式添加到队列
return queue.offer(event);
}
2. 事件去重
// 相同事件去重,避免重复处理
public class EventDeduplicator {
private final Set<String> processedEvents = new ConcurrentHashMap<>().newKeySet();
public boolean shouldProcess(Event event) {
String key = buildEventKey(event);
// 如果已处理,跳过
if (processedEvents.contains(key)) {
return false;
}
// 标记为已处理
processedEvents.add(key);
// 定期清理过期事件
cleanupExpiredEvents();
return true;
}
}
3. 订阅者优先级
// 支持订阅者优先级,重要订阅者优先处理
public interface Subscriber<T extends Event> {
/**
* 订阅者优先级(数字越小优先级越高)
*/
default int priority() {
return 0;
}
}
// 按优先级排序订阅者
List<Subscriber> subscribers = getSubscribers(eventType);
subscribers.sort(Comparator.comparingInt(Subscriber::priority));
for (Subscriber subscriber : subscribers) {
subscriber.onEvent(event);
}
分布式场景考量
1. 集群事件同步
// 集群环境下,事件需要同步到其他节点
public class ClusterEventPublisher implements EventPublisher {
@Override
public boolean publish(Event event) {
// 1. 本地发布
localPublish(event);
// 2. 同步到其他节点
if (event instanceof ClusterEvent) {
syncToOtherNodes(event);
}
return true;
}
private void syncToOtherNodes(Event event) {
for (Member member : clusterMembers) {
if (!member.isSelf()) {
distroProtocol.sync(event, member);
}
}
}
}
2. 事件顺序保证
// 对于需要保证顺序的事件,使用单线程处理
public class OrderedEventPublisher implements EventPublisher {
private final BlockingQueue<Event> queue = new LinkedBlockingQueue<>();
private final Thread executor = new Thread(new OrderedPublisherTask());
private class OrderedPublisherTask implements Runnable {
@Override
public void run() {
while (!shutdown) {
try {
Event event = queue.take();
// 单线程顺序处理
processEvent(event);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
实战示例
1. 自定义事件订阅
// 自定义服务变更监听器
@Component
public class CustomServiceChangeListener implements Subscriber<ServiceChangeEvent> {
@Override
public Class<? extends Event> subscribeType() {
return ServiceChangeEvent.class;
}
@Override
public void onEvent(ServiceChangeEvent event) {
Service service = event.getService();
List<Instance> instances = event.getInstances();
// 自定义处理逻辑
log.info("Service {} changed, total instances: {}",
service.getName(), instances.size());
// 更新负载均衡器
updateLoadBalancer(service.getName(), instances);
// 发送告警(如果实例数变化较大)
checkAndAlert(service, instances);
}
@PostConstruct
public void init() {
// 注册订阅者
NotifyCenter.registerSubscriber(this);
}
}
2. 配置变更监听
// 配置变更监听器
@Component
public class ConfigChangeListener implements Subscriber<ConfigChangeEvent> {
@Override
public Class<? extends Event> subscribeType() {
return ConfigChangeEvent.class;
}
@Override
public void onEvent(ConfigChangeEvent event) {
String dataId = event.getDataId();
String group = event.getGroup();
String content = event.getContent();
// 重新加载配置
reloadConfig(dataId, group, content);
// 通知相关组件
notifyComponents(dataId, group);
}
@PostConstruct
public void init() {
NotifyCenter.registerSubscriber(this);
}
}
面试总结
Nacos事件监听机制核心要点:
- 核心组件:
- NotifyCenter:事件通知中心,单例模式
- EventPublisher:事件发布器,异步处理
- Subscriber:事件订阅者,处理特定事件
- 事件类型:
- 服务注册发现:ServiceChangeEvent、InstanceRegisterEvent等
- 配置管理:ConfigChangeEvent、ConfigRemoveEvent
- 元数据:InstanceMetadataEvent
- 发布订阅模式:
- 发布:
NotifyCenter.publishEvent(event) - 订阅:
NotifyCenter.registerSubscriber(subscriber) - 异步处理,不阻塞主流程
- 发布:
- 性能优化:
- 异步事件处理
- 事件去重
- 订阅者优先级
- 应用场景:
- 服务变更通知客户端
- 配置变更推送
- 监控指标收集
技术亮点:
- 统一的NotifyCenter管理所有事件
- 支持多种事件发布器(DefaultPublisher、SharePublisher)
- 异步处理,高性能
- 完善的订阅者管理机制
设计模式:
- 观察者模式:事件发布和订阅
- 单例模式:NotifyCenter单例
- 策略模式:不同的事件发布器策略