吾爱破解 - LCG - LSG |安卓破解|病毒分析|www.52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 1334|回复: 10
收起左侧

[Java 原创] 深入剖析Spring Cloud源码系列 - Nacos注册中心原理

[复制链接]
siegod 发表于 2023-8-31 00:40
本帖最后由 siegod 于 2023-8-31 13:40 编辑

前言

Nacos(全称为"Naming and Configuration Service")是一个开源的服务发现和配置管理平台。作为一个注册中心,Nacos提供了服务注册、服务发现、服务心跳和服务健康检测功能,使得微服务架构中的各个服务可以相互发现和通信。

本文将深入探讨Nacos注册中心的原理,特别关注CP模式与AP模式的区别与优劣

环境

版本:Nacos-1.3.0

掘金原文

一、Nacos注册中心 与 Spring Cloud 整合

pCqhnCn.png

通过上一篇《深入剖析Spring Boot源码系列 - 自动装配原理》,我们知道一般组件与spring cloud整合时,要去加载spring.factories,接下我们看看spring.factories文件都有些什么?

  • spring.factories

pCLGE1x.png

  • Nacos 服务注册自动装配 - NacosServiceRegistryAutoConfiguration
@Configuration
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled // 仅当启用Nacos的服务发现功能时才生效
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
        matchIfMissing = true) // 当未配置该属性时,默认为启用服务自动注册功能
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
        AutoServiceRegistrationAutoConfiguration.class,
        NacosDiscoveryAutoConfiguration.class }) // 在指定配置类之后,当前配置类自动配置
public class NacosServiceRegistryAutoConfiguration {

    @Bean
    public NacosServiceRegistry nacosServiceRegistry(
            NacosDiscoveryProperties nacosDiscoveryProperties) {
        return new NacosServiceRegistry(nacosDiscoveryProperties);
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosRegistration nacosRegistration(
            NacosDiscoveryProperties nacosDiscoveryProperties,
            ApplicationContext context) {
        return new NacosRegistration(nacosDiscoveryProperties, context);
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosAutoServiceRegistration nacosAutoServiceRegistration(
            NacosServiceRegistry registry,
            AutoServiceRegistrationProperties autoServiceRegistrationProperties,
            NacosRegistration registration) {
        return new NacosAutoServiceRegistration(registry,
                autoServiceRegistrationProperties, registration);
    }

}
  • NacosAutoServiceRegistration : 用于在 Spring Cloud 应用中自动触发(例如 订阅事件)将服务实例注册到 Nacos 服务注册中心
  • NacosServiceRegistry : 实现了 Spring Cloud Commons 中的 ServiceRegistry 接口的具体类,该接口是为注册中心提供统一API((门面模式) ,用于抽象不同注册中心的差异,无需关心底层注册中心的具体实现。

接下来,我们展开对NacosAutoServiceRegistrationNacosServiceRegistry进行分析

二、Nacos客户端-服务注册

  • NacosAutoServiceRegistration
//NacosAutoServiceRegistration 继承了AbstractAutoServiceRegistration
//先看看AbstractAutoServiceRegistration
public class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration> {

    ...省略
}

/**
 * AbstractAutoServiceRegistration 是服务自动注册的抽象类
 * 继承了ApplicationListener<WebServerInitializedEvent> 接口
 * 用于监听 Web 服务器初始化事件和获取 Spring 应用程序上下文。
 *
 * @param <R> 服务注册的具体类型,继承 Registration 接口
 */
public abstract class AbstractAutoServiceRegistration<R extends Registration>
        implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {
    ...省略

    @Override
    @SuppressWarnings("deprecation")
    public void onApplicationEvent(WebServerInitializedEvent event) {
        //1.处理 Web 服务器初始化事件,委派给 bind 方法处理
        bind(event);
    }

    public void bind(WebServerInitializedEvent event) {
        ApplicationContext context = event.getApplicationContext();
        if (context instanceof ConfigurableWebServerApplicationContext) {
            if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
                return;
            }
        }
        //在应用上下文中获取 Web 服务器的端口号并调用 start 方法
        this.port.compareAndSet(0, event.getWebServer().getPort());
        //2. 启动服务自动注册过程
        this.start();
    }

    public void start() {
        if (!isEnabled()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Discovery Lifecycle disabled. Not starting");
            }
            return;
        }

        // 只有在非安全端口大于0且服务未运行时才初始化
        if (!this.running.get()) {
            //发布前置注册事件
            this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));
            //3. 调用子类钩子
            register();
            if (shouldRegisterManagement()) {
                registerManagement();
            }
            //发布后置注册事件
            this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
            //变更运行状态
            this.running.compareAndSet(false, true);
        }
    }

    protected void register() {
        //5. 注册服务实例到服务注册中心
        this.serviceRegistry.register(getRegistration());
    }

    //...省略
}

public class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration> {

    ...省略

    /**
     * 重写模板父类的钩子 register 方法,执行服务注册的逻辑。
     */
    @Override
    protected void register() {
        // 检查注册是否被禁用
        if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
            log.debug("Registration disabled.");
            return;
        }
        // 如果服务实例的端口号小于0,则将端口号设置为当前获取到的端口号
        if (this.registration.getPort() < 0) {
            this.registration.setPort(getPort().get());
        }
        // 4. 调用父类的 register 方法,实现服务注册
        super.register();
    }
}

在这里,在Nacos注册中心使用AbstractAutoServiceRegistration来监听WebServerInitializedEvent事件。

当Spring核心逻辑执行完成刷新(finishRefresh())时,会发布WebServerInitializedEvent事件。此事件由AbstractAutoServiceRegistrationonApplicationEvent方法响应,从而启动服务的自动注册流程。

具体实现是通过NacosAutoServiceRegistration#register()方法来进行前置参数检查,并最终调用NacosServiceRegistry#register()方法来实现服务的注册。

接下来,我们将深入探究NacosServiceRegistry的服务注册实现。

AbstractAutoServiceRegistration 中定义了start()方法作为模板方法,用于定制服务自动注册的整体流程。具体的实现类(如NacosAutoServiceRegistration)则可以通过重写register()方法来添加自定义的逻辑,从而实现钩子(hook)功能,将自定义逻辑插入到服务自动注册的流程中。

  • NacosServiceRegistry分析
/**
 * NacosServiceRegistry 是 Nacos 服务注册中心的实现类,实现了 ServiceRegistry 接口。
 */
public class NacosServiceRegistry implements ServiceRegistry<Registration> {

    private final NacosDiscoveryProperties nacosDiscoveryProperties; //Nacos 服务发现属性

    private final NamingService namingService; //Nacos网络通信服务

    public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
        this.nacosDiscoveryProperties = nacosDiscoveryProperties;
        //通过NacosFatory工厂创建NacosNamingService网络通信
        this.namingService = nacosDiscoveryProperties.namingServiceInstance();
    }

    /**
     * 注册服务实例到 Nacos 服务注册中心。
     */
    @Override
    public void register(Registration registration) {

        ...省略

        String serviceId = registration.getServiceId();
        String group = nacosDiscoveryProperties.getGroup();

        // 封装服务实例信息
        Instance instance = getNacosInstanceFromRegistration(registration);

        try {
            // 调用 Nacos 的 API 注册服务实例
            namingService.registerInstance(serviceId, group, instance);

        } catch (Exception e) {
            log.error("Nacos registry, {} register failed...{},", serviceId,
                    registration.toString(), e);
            // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
            rethrowRuntimeException(e);
        }
    }

    //服务实例注销
    @Override
    public void deregister(Registration registration) {
        // 省略具体实现;
    }

    @Override
    public void close() {
        // 这里可以添加一些关闭资源的逻辑
    }

    //设置服务实例的状态。
    @Override
    public void setStatus(Registration registration, String status) {
        // 省略具体实现
    }

    //获取服务实例的状态。
    @Override
    public Object getStatus(Registration registration) {
        // 省略具体实现
        return null;
    }
}

//spring could 提供统一接口功能的门面模式
public interface ServiceRegistry<R extends Registration> {

    //服务实例注册
    void register(R registration);

    //服务实例销毁
    void deregister(R registration);

    //关闭服务实例注册
    void close();

    //设置服务实例的状态
    void setStatus(R registration, String status);

    //获取服务实例的状态。
    <T> T getStatus(R registration);
}

根据上述代码,可以看到NacosServiceRegistry使用NacosNamingService 网络通信服务来实现服务实例的注册和注销功能。

NacosServiceRegistry # register() 方法中,先根据服务注册信息构建 Nacos 的 Instance 对象,然后调用 NacosNamingService # registerInstance() 方法来注册服务实例。

  • NacosNamingService分析
/**
 * NacosNamingService 是 Nacos 客户端的HTTP网络通信,实现了 NamingService 接口。
 * 用于注册和注销服务实例,并将服务实例的信息存储在 Nacos 服务注册中心中。
 */
public class NacosNamingService implements NamingService {

    //...省略

    /**
     * 用于处理服务实例心跳事件的 BeatReactor 对象。
     */
    private BeatReactor beatReactor;

    /**
     * 用于与 Nacos 服务端交互的 NamingProxy 对象。
     */
    private NamingProxy serverProxy;

    /**
     * 根据传入的 serverList 初始化 NacosNamingService。
     *
     * @param serverList Nacos 服务地址列表
     */
    public NacosNamingService(String serverList) {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);

        //初始化 NacosNamingService
        init(properties);
    }

    /**
     * 构造方法,根据传入的 properties 初始化 NacosNamingService。
     *
     * @param properties Nacos 服务配置属性
     */
    public NacosNamingService(Properties properties) {
        init(properties);
    }

    /**
     * 初始化 NacosNamingService。
     */
    private void init(Properties properties) {
        // 初始化 namespace
        namespace = InitUtils.initNamespaceForNaming(properties);

        // 初始化 endpoint 和 serverList
        initServerAddr(properties);

        //...省略

        //用与 Nacos 服务端交互
        serverProxy = new NamingProxy(namespace, endpoint, serverList, properties);
        //处理服务实例心跳事件的 BeatReactor 对象。
        beatReactor = new BeatReactor(serverProxy, initClientBeatThreadCount(properties));
    }

    // 省略一些私有初始化方法

    /**
     * 注册服务实例到 Nacos 服务注册中心。
     */
    @Override
    public void registerInstance(String serviceName, Instance instance) throws NacosException {
        registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);
    }

    /**
     * 注册服务实例到 Nacos 服务注册中心。
     */
    @Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        // 如果服务实例是临时实例,则添加到心跳信息中
        if (instance.isEphemeral()) {
            BeatInfo beatInfo = new BeatInfo();
            beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
            beatInfo.setIp(instance.getIp());
            beatInfo.setPort(instance.getPort());
            beatInfo.setCluster(instance.getClusterName());
            beatInfo.setWeight(instance.getWeight());
            beatInfo.setMetadata(instance.getMetadata());
            beatInfo.setScheduled(false);
            beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());

            //添加临时实例定时心跳任务,每隔5秒发送
            beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
        }

        // 调用 NamingProxy 的 registerService 方法
        // 使用随机负载均衡 HTTP请求 POST /nacos/v1/ns/instance Nacosk服务端注册服务实例
        serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
    }

    // 省略其他方法实现
}

核心功能- 服务心跳,先判断服务实例是否为临时实例(Ephemeral)。如果是临时实例,表示这个实例不会在注册中心长期存活,而是需要发送服务心跳 (每隔5秒) 来维持其在注册中心的存活状态。通过这种方式,Nacos能够实时感知服务实例的存活状态,从而提供实时的服务发现和负载均衡功能。

下面,我们看看http post请求到Nacos服务端时,服务注册是如何进行的

三、Nacos服务端-服务注册

第一步:Controller层

@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {

    /**
     * 注册新的服务实例。
     *
     * @param request HTTP 请求对象
     * @return 若注册成功,则返回 'ok'
     */
    @CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {

        // 获取服务名和命名空间ID
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
                Constants.DEFAULT_NAMESPACE_ID);

        // 解析HTTP请求中的实例信息
        final Instance instance = parseInstance(request);

        // 将实例信息注册到指定的服务中
        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }
}

第二步 Manage层

@Component
public class ServiceManager implements RecordListener<Service> {

    //依赖注入DelegateConsistencyServiceImpl服务
    @Resource(name = "consistencyDelegate")
    private ConsistencyService consistencyService;

    //...省略

    /**
     * 在AP模式下,将一个实例注册到指定的服务中。
     *
     * <p>如果服务或集群不存在,该方法会默默地创建它们。
     *
     * @param namespaceId 命名空间ID
     * @param serviceName 服务名
     * @param instance    要注册的实例
     * @throws NacosException 注册过程中发生的任何异常都会抛出
     */
    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {

        //创建服务,第一次投递定时任务,每隔5秒服务健康检查
        //通过serviceName的hash值与nacos服务端集群取模选择一台执行任务,非当前服务端处理需要进行转发
        //对15秒未发送心跳实例修改健康状态为false,30秒未发送心跳进行服务实例剔除
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());

        // 获取指定服务的实例
        Service service = getService(namespaceId, serviceName);

        // ...省略

        // 将实例添加到服务中
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

        /**
     * 添加实例到指定的服务中。
     *
     * @param namespaceId 命名空间ID
     * @param serviceName 服务名
     * @param ephemeral   实例是否为临时性实例
     * @param ips          需要添加的实例列表
     */
    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
        // 构建用于选择CP模式或者AP模式一致性服务的键值
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

        // 获取指定服务的信息
        Service service = getService(namespaceId, serviceName);

        synchronized (service) {
            // 添加IP地址到指定服务,并得到更新后的实例列表
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

            // 创建一个包含更新后实例列表的Instances对象
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);

            // 将实例列表放入一致性服务中,以确保实例信息在集群中保持一致
            consistencyService.put(key, instances);
        }
    }
}

第三步: 一致性委托服务选择CP或者AP模式服务

@DependsOn("ProtocolManager")
@Service("consistencyDelegate")
public class DelegateConsistencyServiceImpl implements ConsistencyService {

    // CP模式一致性服务,用于处理持久性数据
    private final PersistentConsistencyService persistentConsistencyService;
    // AP模式一致性服务,用于处理临时性数据
    private final EphemeralConsistencyService ephemeralConsistencyService;

    /**
     * 初始化委托一致性服务实例。
     *
     * @param persistentConsistencyService CP模式一致性服务实例
     * @param ephemeralConsistencyService  AP模式一致性服务实例
     */
    public DelegateConsistencyServiceImpl(
            PersistentConsistencyService persistentConsistencyService,
            EphemeralConsistencyService ephemeralConsistencyService) {
        this.persistentConsistencyService = persistentConsistencyService;
        this.ephemeralConsistencyService = ephemeralConsistencyService;
    }

    //...省略

    @Override
    public void put(String key, Record value) throws NacosException {
        //选择AP模式或者CP模式服务
        mapConsistencyService(key).put(key, value);
    }

    /**
     * 根据给定的键值选择对应的一致性服务。
     *
     * @return 选择的一致性服务,如果是临时性实例则返回ephemeralConsistencyService,否则返回persistentConsistencyService
     */
    private ConsistencyService mapConsistencyService(String key) {
        // 判断给定的键值是否匹配临时性实例的键模式,根据结果选择对应的一致性服务
        return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
    }
}

上述代码很简单,分层很符合国内代码习惯,每层做了以下几件事:

  • Controller层: 解析HTTP请求中的实例信息
  • Manager层:对于第一次注册的服务实例添加定时任务进行 服务健康检查 。通过将serviceNamehash值与Nacos服务端集群的节点数取模来选择一台执行心跳检测任务。如果选择的不是当前服务端,还需要将任务转发到目标服务端做心跳检测维护服务实例的健康状态,如果一个服务实例在15秒内没有发送心跳,则会被标记为不健康状态;如果在30秒内没有发送心跳,则会被剔除。
  • Service层:DelegateConsistencyServiceImpl提供一致性委托服务,根据服务实例是否为临时节点,选择CP模式或AP模式的服务。

3.1 Nacos服务端 - AP模式

/**
*   AP模式-distro协议
**/

@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {

    private final DistroMapper distroMapper; // 数据分发映射器,用于在集群中分发数据。
    private final DataStore dataStore; // 数据存储器,用于持久化和管理分布式数据。
    private final TaskDispatcher taskDispatcher; // 任务调度器,用于处理与数据一致性相关的任务。

    private final ServerMemberManager memberManager; // 服务器成员管理器,用于管理集群中的服务器成员。
    private final GlobalConfig globalConfig; // 全局配置。

    private volatile Notifier notifier = new Notifier(); // 用于通知数据更新的通知器。

    // 监听器映射,用于存储不同Key的监听器队列。
    private Map<String, ConcurrentLinkedQueue<RecordListener>> listeners = new ConcurrentHashMap<>();

    // 同步校验和任务映射,用于存储待同步校验和的任务。
    private Map<String, String> syncChecksumTasks = new ConcurrentHashMap<>(16);

    public DistroConsistencyServiceImpl(DistroMapper distroMapper, DataStore dataStore,
                                        TaskDispatcher taskDispatcher, Serializer serializer,
                                        ServerMemberManager memberManager, SwitchDomain switchDomain,
                                        GlobalConfig globalConfig) {
        //数据分发映射器
        this.distroMapper = distroMapper;
        //数据存储器
        this.dataStore = dataStore;
        //任务调度器
        this.taskDispatcher = taskDispatcher;
        //序列化器
        this.serializer = serializer;
        //集群管理
        this.memberManager = memberManager;
        this.switchDomain = switchDomain;
        //全局配置
        this.globalConfig = globalConfig;
    }

    /**
     * 初始化方法,在 PostConstruct 阶段执行。
     */
    @PostConstruct
    public void init() {
        // 开启加载数据的任务
        GlobalExecutor.submit(loadDataTask);
        // 开启数据更新通知任务
        GlobalExecutor.submitDistroNotifyTask(notifier);
    }
    /**
     * 添加给定的记录
     */
    @Override
    public void put(String key, Record value) throws NacosException {
        // 1. 将注册实例 状态为ApplyAction.CHANGE 投递 notifier
        onPut(key, value);
        // 2. 服务实例 -> 投递任务调度器 -> 数据同步预处理 -> 投递数据同步器 -> 同步服务实例给其他集群节点
        taskDispatcher.addTask(key); 
    }
}

这段代码是一个实现了 EphemeralConsistencyService 接口的 DistroConsistencyServiceImpl 类。它提供了 AP 模式下的数据分发与一致性服务。

  • onPut : 投递到 notifier 通知器,以触发数据更新事件
  • taskDispatcher#addTask: 任务投递到任务调度器,进行数据赞批或者定期,最终进行数据同步操作,将服务实例同步给其他集群节点。

接下分析分析 onPut(key, value)taskDispatcher.addTask(key) 做了些什么?

  • onPut分析
public void onPut(String key, Record value) {
    // 1.1 如果key匹配临时性实例列表的key模式
    if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
        // 创建一个包含数据记录的临时性实例列表
        Datum<Instances> datum = new Datum<>();
        datum.value = (Instances) value;
        datum.key = key;
        datum.timestamp.incrementAndGet();
        // 将数据记录存储到数据存储器中
        dataStore.put(key, datum);
    }

    if (!listeners.containsKey(key)) {
        return;
    }

    // 1.2 向通知器中添加数据变更任务,Action为CHANGE
    notifier.addTask(key, ApplyAction.CHANGE);
}

/**
 * 任务通知器
 */
public class Notifier implements Runnable {

    // 存储服务信息,key为服务名,value为一个空字符串
    private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);

    // 任务阻塞队列,存储key和动作
    private BlockingQueue<Pair<String, ApplyAction>> tasks = new ArrayBlockingQueue<>(1024 * 1024);

    /**
     * 添加通知任务。
     */
    public void addTask(String datumKey, ApplyAction action) {

        // 1.2.1 services中已经包含了datumKey
        //并且操作动作是CHANGE,则直接返回,避免重复通知
        if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
            return;
        }

        // 操作动作是CHANGE,则将datumKey添加到services中
        if (action == ApplyAction.CHANGE) {
            services.put(datumKey, StringUtils.EMPTY);
        }

        // 1.2.2 将任务加入阻塞队列中
        tasks.offer(Pair.with(datumKey, action));
    }

    /**
     * 通知任务的执行方法,当任务队列中有任务时,循环执行任务。
     */
    @Override
    public void run() {
        ...省略
        for (; ; ) {
            try {
                // 1.3.1 循环take阻塞从任务队列中取出任务
                Pair<String, ApplyAction> pair = tasks.take();
                // 1.3.2 处理任务
                handle(pair);
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
            }
        }
    }

    /**
     * 处理通知任务。
     *
     */
    private void handle(Pair<String, ApplyAction> pair) {
        try {
            String datumKey = pair.getValue0();
            ApplyAction action = pair.getValue1();

            // 1.3.3 将datumKey从移除服务信息中
            services.remove(datumKey);
            int count = 0;
            // 检查监听器中是否包含该key
            if (!listeners.containsKey(datumKey)) {
                return;

            // 遍历监听器
            for (RecordListener listener : listeners.get(datumKey)) {
                count++;
                try {
                    if (action == ApplyAction.CHANGE) {
                        // 1.3.4 如果操作动作是CHANGE,则调用监听器的onChange方法
                        listener.onChange(datumKey, dataStore.get(datumKey).value);
                        continue;
                    }

                    if (action == ApplyAction.DELETE) {
                        // 如果操作动作是DELETE,则调用监听器的onDelete方法
                        listener.onDelete(datumKey);
                        continue;
                    }
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}",
                            datumKey, e);
                }
            }
            ...省略
    }
}

public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {

            ...省略  

            @Override
        public void onChange(String key, Instances value) throws Exception {
            ...省略
            // 1.4. 更新实例的IP地址列表
            updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
            // 重新计算数据校验和
            recalculateChecksum();
        }

        public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
            // 存储每个集群对应实例列表的映射
            Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
            for (String clusterName : clusterMap.keySet()) {
                ipMap.put(clusterName, new ArrayList<>());
            }

            ...省略

            // 1.4.1 遍历每个集群的实例列表
            for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
                List<Instance> entryIPs = entry.getValue();
                //发生服务发现时,会从Cluster维护临时节点集合进行查找
                //更新集群中的实例信息,用到了CopyOnWrite思想
                clusterMap.get(entry.getKey()).updateIPs(entryIPs, ephemeral);
            }

            // 更新服务的最后修改时间戳
            setLastModifiedMillis(System.currentTimeMillis());

            /**
            *   1.4.2 发布ServiceChangeEvent事件,通知UDP推送服务实例变化服务
            *   
            **/
            getPushService().serviceChanged(this);

            ...省略
        }
}

onPut方法中,首先通过KeyBuilder.matchEphemeralInstanceListKey(key)判断key是否匹配临时性实例列表的key模式,如果匹配,则将实例信息封装成Datum<Instances>对象,并异步地通过Notifier线程处理,将数据记录存储到数据存储器中(dataStore.put(key, datum))。这样异步处理避免了阻塞主线程,提高了处理效率。

接着,在数据处理完成后,Nacos通过发布ServiceChangeEvent事件,触发UDP推送服务,将服务实例变动通知给订阅的客户端。这种方式相对于ZooKeeperTCP长连接模式,确实节约了很多资源,尤其在大量节点更新时不会出现性能瓶颈。虽然UDP推送不能保证数据可靠性,但Nacos客户端通过定时任务轮询(每隔1秒)进行服务发现的方式做兜底,可以保证数据的最终一致性和可靠性。这种 服务端UDP推送+客户端定时轮询的方式在很多实际场景中能够有效平衡实时性和数据可靠性的需求。

  • taskDispatcher.addTask(key)分析
@Component
public class TaskDispatcher {

    @Autowired
    private GlobalConfig partitionConfig; // 全局配置 - 配置任务的执行策略

    @Autowired
    private DataSyncer dataSyncer; // 数据同步器

    private List<TaskScheduler> taskSchedulerList = new ArrayList<>(); // 任务调度器列表

    private final int cpuCoreCount = Runtime.getRuntime().availableProcessors(); // 获取CPU核心数

    @PostConstruct
    public void init() {
        // 根据CPU核心数创建相应数量的任务调度器,并将它们加入任务调度器列表中
        for (int i = 0; i < cpuCoreCount; i++) {
            TaskScheduler taskScheduler = new TaskScheduler(i);
            taskSchedulerList.add(taskScheduler);
            // 使用全局执行器(GlobalExecutor)提交任务调度器的线程执行
            GlobalExecutor.submitTaskDispatch(taskScheduler);
        }
    }

    /**
     * 添加任务到任务调度器中
     */
    public void addTask(String key) {
        // 2.1 将任务添加到相应的任务调度器中 TaskScheduler#addTask()
        // 使用UtilsAndCommons.shakeUp()方法计算任务应该分发到哪个任务调度器
        taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);
    }
}

//任务调度器
public class TaskScheduler implements Runnable {

    ...省略

    public void addTask(String key) {
        // 将任务投递到队列中
        queue.offer(key); 
    }

    ...省略

    @Override
    public void run() {
        // 任务调度器的执行逻辑
        List<String> keys = new ArrayList<>();
        while (true) { // 循环执行任务调度逻辑

            try {
                // 2.2.1 从任务队列中获取任务,等待一定的时间,如果没有任务则继续下一次循环
                String key = queue.poll(partitionConfig.getTaskDispatchPeriod(), TimeUnit.MILLISECONDS);

                ...省略

                // 2.2.2 将任务key添加到keys列表中攒批
                keys.add(key);
                dataSize++;

                // 2.2.3 判断是否达到了批量同步的条件,即数据量达到了批量同步的阈值(默认1000)或距离上次派发任务的时间超过了任务派发周期(默认2000毫秒)
                if (dataSize == partitionConfig.getBatchSyncKeyCount()
                        || (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {

                    // 遍历所有数据同步器的服务节点
                    for (Member member : dataSyncer.getServers()) {
                        // 如果与本地节点相同,跳过本次循环
                        if (NetUtils.localServer().equals(member.getAddress())) {
                            continue;
                        }
                        // 创建同步任务SyncTask
                        SyncTask syncTask = new SyncTask();
                        syncTask.setKeys(keys);
                        syncTask.setTargetServer(member.getAddress());

                        // 2.3 提交同步任务到数据同步器中进行处理
                        dataSyncer.submit(syncTask, 0);
                    }

                    // 2.4. 更新任务的最后派发时间,并将dataSize重置为0
                    lastDispatchTime = System.currentTimeMillis();
                    dataSize = 0;
                }

            } catch (Exception e) {
                Loggers.DISTRO.error("dispatch sync task failed.", e);
            }
        }
    }
}

public class DataSyncer {
    ...

    // 标记任务是否正在处理中
    private Map<String, String> taskMap = new ConcurrentHashMap<>(16);

    ...

    // 提交一个同步任务进行处理
    public void submit(SyncTask task, long delay) {
        // 如果是新任务:
        if (task.getRetryCount() == 0) {

            Iterator<String> iterator = task.getKeys().iterator();
            while (iterator.hasNext()) {
                String key = iterator.next();
                // 将每个key与目标服务器的组合作为key,加入到任务映射表 taskMap 中
                if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) {
                    // 如果该键值对应的组合已存在于 taskMap 中,表示已经有一个相同的任务在进行中
                    // 则从当前任务的键值集合中移除该键值
                    iterator.remove();
                }
            }
        }

        // 如果任务中的键值集合为空,则说明所有的键值都已被移除,不需要进行数据同步
        if (task.getKeys().isEmpty()) {
            // all keys are removed:
            return;
        }

        // 将数据同步任务提交给全局执行器 GlobalExecutor 进行异步处理,指定延迟执行时间 delay
        GlobalExecutor.submitDataSync(() -> {
            // 2.3.1 检查服务器列表是否为空
            if (getServers() == null || getServers().isEmpty()) {
                return;
            }

            // 获取当前任务中的key集合
            List<String> keys = task.getKeys();

            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("try to sync data for this keys {}.", keys);
            }

            // 2.3.2 批量获取key对应的数据,并检查数据是否为空
            Map<String, Datum> datumMap = dataStore.batchGet(keys);
            if (datumMap == null || datumMap.isEmpty()) {
                // 如果数据为空,清除当前任务的所有标志
                for (String key : keys) {
                    taskMap.remove(buildKey(key, task.getTargetServer()));
                }
                return;
            }

            // 将数据序列化成字节数组
            byte[] data = serializer.serialize(datumMap);

            // 记录当前时间戳
            long timestamp = System.currentTimeMillis();
            // 3.3 调用 NamingProxy网络通信 的 syncData 方法将数据同步到其他集群服务器
            boolean success = NamingProxy.syncData(data, task.getTargetServer());
            if (!success) {
                // 如果同步失败,创建新的重试任务,并将重试任务提交给 retrySync 方法
                SyncTask syncTask = new SyncTask();
                syncTask.setKeys(task.getKeys());
                syncTask.setRetryCount(task.getRetryCount() + 1);
                syncTask.setLastExecuteTime(timestamp);
                syncTask.setTargetServer(task.getTargetServer());
                retrySync(syncTask);
            } else {
                // 如果同步成功,清除当前任务的所有标志
                for (String key : task.getKeys()) {
                    taskMap.remove(buildKey(key, task.getTargetServer()));
                }
            }
        }, delay);
    }
}

这段代码涉及到两个关键组件:TaskDispatcherDataSyncerTaskDispatcher 负责任务的分发和调度,DataSyncer 负责任务数据的同步处理

  1. TaskDispatcher 组件:

    • 在Bean初始化阶段(@PostConstruct)时,根据 CPU 核心数创建对应数量的任务调度器实例(TaskScheduler),并将它们添加到任务调度器列表中。
    • 每个任务调度器是一个独立的线程,在启动后会循环从任务队列中取出任务并进行处理。
    • 添加任务时,通过 UtilsAndCommons.shakeUp() 方法计算应该将任务分发到哪个任务调度器,并将任务添加到相应的任务调度器中。
    • TaskScheduler 任务调度器的主要逻辑是收集任务到一定数量或等待一定时间后,将任务分发给数据同步器(DataSyncer)处理。
  2. DataSyncer 组件:

    • 数据同步器会提交任务到全局执行器(GlobalExecutor),实现数据同步任务的异步处理。
    • 任务的同步机制采用了重试机制,当同步任务失败时,会创建新的重试任务,并将重试任务提交给 retrySync 方法。
    • taskMap 是用于标记任务是否正在处理中的任务映射表。

3.2 Nacos服务端 - CP模式

@DependsOn("ProtocolManager") 
@Service 
public class RaftConsistencyServiceImpl implements PersistentConsistencyService {

    @Autowired
    private RaftCore raftCore; // Raft 协议的核心组件

    @Autowired
    private RaftPeerSet peers; // Raft 协议中的节点集合

    @Autowired
    private SwitchDomain switchDomain; // 包含各种开关配置

    /**
     * 将给定的记录持久化到集群中,实现了 PersistentConsistencyService 接口的 put 方法。
     */
    @Override
    public void put(String key, Record value) throws NacosException {
        try {
            // 1. 调用 RaftCore#signalPublish 方法,将数据存入 Raft 日志,并发起 Raft 协议的数据复制与一致性处理
            raftCore.signalPublish(key, value);
        } catch (Exception e) {
            // 处理异常情况,记录错误日志,并抛出 NacosException
            Loggers.RAFT.error("Raft put failed.", e);
            throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e);
        }
    }
}

@DependsOn("ProtocolManager")
@Component
public class RaftCore {

    ...省略

    /**
     * 发送数据变更信号,将指定的记录进行发布。
     */
    public void signalPublish(String key, Record value) throws Exception {
        // 1. 如果当前节点不是 Leader 节点,则将数据变更信号发送给 Leader 节点处理
        if (!isLeader()) {
            ObjectNode params = JacksonUtils.createEmptyJsonNode();
            params.put("key", key);
            params.replace("value", JacksonUtils.transferToJsonNode(value));
            Map<String, String> parameters = new HashMap<>(1);
            parameters.put("key", key);

            final RaftPeer leader = getLeader();
            // 使用 Raft 代{过}{滤}理方法将数据变更信号发送给 Leader 节点
            raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
            return;
        }

        try {
            OPERATE_LOCK.lock();
            long start = System.currentTimeMillis();
            final Datum datum = new Datum();
            datum.key = key;
            datum.value = value;
            // 获取记录的时间戳,并更新到当前记录的时间戳中
            if (getDatum(key) == null) {
                datum.timestamp.set(1L);
            } else {
                datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
            }

            ObjectNode json = JacksonUtils.createEmptyJsonNode();
            json.replace("datum", JacksonUtils.transferToJsonNode(datum));
            json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));

            // 2. 数据持久化同步写入磁盘,并投递数据更新通知队列更新内存注册列表 
            onPublish(datum, peers.local());

            final String content = json.toString();

            //3. CountDownLatch 异步实现简单的Raft机制半数ack成功
            final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
            // 遍历所有节点,将数据更新通知发送给其他节点
            for (final String server : peers.allServersIncludeMyself()) {
                if (isLeader(server)) {
                    // 如果当前节点是 Leader 节点,则不需要通知自己,直接跳过当前节点
                    latch.countDown();
                    continue;
                }
                final String url = buildURL(server, API_ON_PUB);
                HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {
                    @Override
                    public Integer onCompleted(Response response) throws Exception {
                        if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                            // 如果数据更新通知发送失败,则记录警告日志,并返回 1
                            Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                datum.key, server, response.getStatusCode());
                            return 1;
                        }
                        latch.countDown();
                        return 0;
                    }

                    @Override
                    public STATE onContentWriteCompleted() {
                        return STATE.CONTINUE;
                    }
                });

            }

            if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
                // 如果在规定的时间内没有得到大多数节点的成功响应,则认为数据发布失败,记录错误日志,并抛出 IllegalStateException 异常
                Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
                throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
            }
            long end = System.currentTimeMillis();
        } finally {
            OPERATE_LOCK.unlock();
        }
    }    

}

根据上述代码,可以看出Nacos的CP模式使用了简化版的Raft一致性算法,利用CountDownLatch实现了多数节点确认数据更新的过程。然而,它并没有严格遵循传统的两阶段提交(2PC)协议。

在传统的CP模式中,数据同步的第一阶段通常对用户是不可见的,直到二阶段数据提交后才对用户可见,从而提供了强一致性保证。而Nacos CP模式在第一阶段进行了数据的更新/持久化以及异步复制数据给集群节点,并没有第二阶段,这与传统2PC的行为不完全一致。如果出现数据不一致,依赖定期的集群同步来进行补救。总的来说,我觉得它并不能完全满足传统CP模式的特性。

以下是一个常见Raft协议的例子:

RocketMQ消息中间件 - Dledger协议

一阶段:

  1. Leader节点写commitLog文件并更新Leader节点的水位标识(Leader节点记录的最大日志索引值)。
  2. 日志复制分发线程通过writerIndex将日志复制请求按顺序发送给所有Follower节点,并等待Follower节点的响应。Follower节点的响应将回调更新Leader节点缓存中Follower的水位标识。

二阶段:

  1. Leader节点的票据ACK定时检查线程会根据本地缓存的Follower节点集合的水位标识,按照降序取1/2节点数,确定是否达到了半数节点的二阶段提交成功。若达到半数以上的节点已确认,则Leader节点更新commitIndex(提交索引)。
  2. Leader节点通过commitIndex查找挂起的写请求,并进行回调响应客户端。

这样的机制确保了当半数以上节点已经确认二阶段提交成功后,Leader节点会更新commitIndex,从而将提交操作应用到数据中,从而保持了数据的一致性。这种方式类似于2PC中的提交阶段,确保了数据的可靠性和一致性。

commitIndex 落后于 writerIndex,消费者只能消费到commitIndex之前的数据,确保了在数据二阶段提交之前不会被消费,从而保证了数据的一致性和完整性。

免费评分

参与人数 4吾爱币 +9 热心值 +4 收起 理由
wushaominkk + 7 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!
caojian162411 + 1 + 1 我很赞同!
为之奈何? + 1 + 1 我很赞同!
hrh123 + 1 用心讨论,共获提升!

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

不知道改成啥 发表于 2023-8-31 08:29
楼主图全裂了
 楼主| siegod 发表于 2023-8-31 13:42
不知道改成啥 发表于 2023-8-31 14:12
JTZ 发表于 2023-8-31 14:29
nice 太棒了
fast001 发表于 2023-8-31 14:33
有点长慢慢看
JokerDa 发表于 2023-8-31 22:19
谢谢分享,有点长 慢慢看 哈哈哈
hubspring 发表于 2023-9-1 11:49
牛,一定是个大神把
Ore0 发表于 2024-3-14 11:29
。mark一下,每天看一点
xixiaobo95 发表于 2024-3-15 09:57
学习一下原理,自己搭建一个简易版本的学习效果更好
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则 警告:本版块禁止灌水或回复与主题无关内容,违者重罚!

快速回复 收藏帖子 返回列表 搜索

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2024-5-2 14:43

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表