深入理解Dubbo原理系列(五)- Dubbo集群容错、路由、负载均衡实现原理
发布日期:2021-05-07 14:50:07 浏览次数:25 分类:精选文章

本文共 27999 字,大约阅读时间需要 93 分钟。

深入理解Dubbo原理系列(五)- Dubbo集群容错、路由、负载均衡实现原理

一. Dubbo容错

1.1 Cluster层

Cluster层可以看做一个集群容错层。包含几个核心接口:

  • Cluster(接口,提供Failover等容错策略)
  • Directory
  • Router
  • LoadBalance

Cluster的整体工作流程可以分为以下几步(基本上和RPC调用的流程是一样的):

  1. 生成Invoker对象,(不同的Cluster生成不同的ClusterInvoker)调用其invoker方法。
  2. 获得可调用的服务列表。
  3. 负载均衡。
  4. RPC调用。

流程图如下:

在这里插入图片描述

1.2 容错机制

1.2.1 Cluster接口关系

容错的接口主要有两大类:

  • Cluster:顶层接口
  • ClusterInvoker:负责实现容错策略部分。

Cluster的实现类,这里称为A类,A和ClusterInvoker是一个一对一的关系。我们知道Cluster接口可以有多种A实现类,那么在每个A中,都要实现其join()方法,并返回一个对应类型的ClusterInvoker实现。

FailoverCluster为例,来看下他的继承关系:

在这里插入图片描述
FailoverCluster类:

public class FailoverCluster extends AbstractCluster {       public final static String NAME = "failover";    @Override    public 
AbstractClusterInvoker
doJoin(Directory
directory) throws RpcException { return new FailoverClusterInvoker<>(directory); }}

最终还是等同于实现了Cluster接口的join()方法

public abstract class AbstractCluster implements Cluster {   	 @Override    public 
Invoker
join(Directory
directory) throws RpcException { return buildClusterInterceptors(doJoin(directory), directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY)); } protected abstract
AbstractClusterInvoker
doJoin(Directory
directory) throws RpcException;}

此外,AbstractClusterInvoker是一个抽象类,封装一些通用的模板逻辑:如获取服务列表、负载均衡、调用服务提供者等。其下面有7大子类,分别实现了不同的集群容错机制:

在这里插入图片描述

1.2.2 容错机制概述

Cluseter的具体实现:用户可以在<dubbo :service><dubbo:reference><dubbo:consumer><dubbo:provider>标签上通过cluster属性设置,其代码实现

Failover策略

Failover是Dubbo的默认容错策略,因为Cluster接口上的@SPI注解中,引用了FailoverCluster。当调用出现失败时,会重试其他服务器。同时,用户还可以通过retries="2"设置重试次数

在这里插入图片描述
代码(后文都是xxxClusterInvoker类下的doInvoke()方法):FailoverClusterInvoker下的doInvoke()方法:

public class FailoverClusterInvoker
extends AbstractClusterInvoker
{ @Override @SuppressWarnings({ "unchecked", "rawtypes"}) public Result doInvoke(Invocation invocation, final List
> invokers, LoadBalance loadbalance) throws RpcException { List
> copyInvokers = invokers; // 1.校验传入的Invoker列表是否为空 checkInvokers(copyInvokers, invocation); String methodName = RpcUtils.getMethodName(invocation); // 2.获取参数配置,从调用的URL中获取对应的retries,即重试次数 int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // 3.初始化一些集合和对象。用于保存调用过程中出现的异常、记录调用了哪些节点 RpcException le = null; // last exception. List
> invoked = new ArrayList
>(copyInvokers.size()); // invoked invokers. Set
providers = new HashSet
(len); // 4.通过for循环完成重试,len=重试次数,成功则返回,失败则继续循环,否则抛出最后的异常 for (int i = 0; i < len; i++) { // 5.若循环次数大于1,则代表有过失败,则检验节点是否被销毁、传入的列表是否为空 if (i > 0) { checkWhetherDestroyed(); copyInvokers = list(invocation); // check again checkInvokers(copyInvokers, invocation); } // 6.调用select做负载均衡,得到要调用的节点,并记录在上述的集合中 Invoker
invoker = select(loadbalance, invocation, copyInvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { // 7.远程调用 Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { //... } return result; } catch (RpcException e) { //... } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(....); }}

流程图如下:

在这里插入图片描述


Failfast策略

快速失败,当请求失败后,快速返回异常结果,不做任何重试。会对请求 做负载均衡,通常使用在非幕等接口的调用上。

代码:

@Overridepublic Result doInvoke(Invocation invocation, List
> invokers, LoadBalance loadbalance) throws RpcException { // 1.校验参数 checkInvokers(invokers, invocation); // 2.负载均衡 Invoker
invoker = select(loadbalance, invocation, invokers, null); try { // 3.进行远程调用,若捕捉到异常,则封装成RpcException类进行返回。 return invoker.invoke(invocation); } catch (Throwable e) { if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception. throw (RpcException) e; } throw new RpcException(...); }}

Failsafe策略

当出现异常时,直接忽略异常。会对请求做负载均衡。既然是忽略异常,那么这种调用则适用于不关心调用是否成功的一些功能。

代码:

@Overridepublic Result doInvoke(Invocation invocation, List
> invokers, LoadBalance loadbalance) throws RpcException { try { // 1.检查 checkInvokers(invokers, invocation); // 2.负载均衡 Invoker
invoker = select(loadbalance, invocation, invokers, null); // 3.调用,若出现异常,则返回一个空的结果 return invoker.invoke(invocation); } catch (Throwable e) { logger.error("Failsafe ignore exception: " + e.getMessage(), e); return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore }}

Fallback策略

**请求失败后,会自动记录在失败队列中,并由一个定时线程池定时重试,适用于一些异步或最终一致性的请求。**请求会做负载均衡。

代码:

@Overrideprotected Result doInvoke(Invocation invocation, List
> invokers, LoadBalance loadbalance) throws RpcException { Invoker
invoker = null; try { // 1.检查 checkInvokers(invokers, invocation); // 2.负载均衡 invoker = select(loadbalance, invocation, invokers, null); // 3.远程调用 return invoker.invoke(invocation); } catch (Throwable e) { // 4.若出现异常,将invocation保存到重试集合addFailed中,并返回一个空的结果集。 // 定时线程池将addFailed集合中的失败请求拿出来进行重新请求,若成功则从集合中移除。 addFailed(loadbalance, invocation, invokers, invoker); return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore }}

Forking策略

同时调用多个相同的服务,只要其中一个返回,则立即返回结果。若所有请求都失败时,Forking才算失败

代码:

@Override@SuppressWarnings({   "unchecked", "rawtypes"})public Result doInvoke(final Invocation invocation, List
> invokers, LoadBalance loadbalance) throws RpcException { try { // 1.校验参数是否可用 checkInvokers(invokers, invocation); // 初始化一个集合,用于保存真正要调用的Invoker列表。 final List
> selected; // 用户设置最大的并行数:forks final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS); final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); if (forks <= 0 || forks >= invokers.size()) { selected = invokers; } else { // 2.获得最重要调用的Invoker列表 selected = new ArrayList<>(forks); // selected.size()=实际可以调用的最大服务数,若
<用户设置最大的并行数 则说明可用的服务数小于用户的设置,因此最终要调用的 invoker只能有selected.size()个 while (selected.size() < forks) { 循环调用负载均衡方法,不断得到可调用的invoker,加入到第一步中的invoker列表中。 invoker
invoker = select(loadbalance, invocation, invokers, selected); if (!selected.contains(invoker)) { //Avoid add the same invoker several times. selected.add(invoker); } } } // 3.调用前的准备工作。设置要调用的Invoker列表到RPC上下文 // 同时初始化一个异常计数器和一个阻塞队列,用于记录并行调用的结果 RpcContext.getContext().setInvokers((List) selected); final AtomicInteger count = new AtomicInteger(); final BlockingQueue
ref = new LinkedBlockingQueue<>(); // 4.执行调用。循环使用线程池并行调用,调用成功,则把结果加入阻塞队列。 for (final Invoker
invoker : selected) { executor.execute(() -> { try { Result result = invoker.invoke(invocation); ref.offer(result); } catch (Throwable e) { // 调用失败,则失败计数器+1 int value = count.incrementAndGet(); if (value >= selected.size()) { ref.offer(e); } } }); } try { // 5.同步等待结果,主线程中会使用阻塞队列的poll(-超时时间“)方法,同步等待阻塞队列中的第一个结果, // 如果是正常结果则返回,如果是异常则抛出。 Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS); if (ret instanceof Throwable) { Throwable e = (Throwable) ret; throw new RpcException(...); } return (Result) ret; } catch (InterruptedException e) { throw new RpcException(...); } } finally { // clear attachments which is binding to current thread. RpcContext.getContext().clearAttachments(); }}

流程图如下:

在这里插入图片描述


Broadcast策略

广播调用所有可用的服务,任意一个节点报错则报错。 不做负载均衡。

代码:

@Override@SuppressWarnings({   "unchecked", "rawtypes"})public Result doInvoke(final Invocation invocation, List
> invokers, LoadBalance loadbalance) throws RpcException { // 1.校验参数并初始化一些对象,用于保存调用过程中产生的异常和结果信息等。 checkInvokers(invokers, invocation); RpcContext.getContext().setInvokers((List) invokers); RpcException exception = null; Result result = null; // 2.循环遍历所有Invoker,直接做RPC调用。 for (Invoker
invoker : invokers) { try { result = invoker.invoke(invocation); } catch (RpcException e) { // 若出错,则先进行日志的打印,只有最后才会抛出异常,最终会抛出最后一个节点的异常(异常会覆盖) exception = e; logger.warn(e.getMessage(), e); } catch (Throwable e) { exception = new RpcException(e.getMessage(), e); logger.warn(e.getMessage(), e); } } if (exception != null) { throw exception; } return result;}

Available策略

**遍历所有服务列表,找到第一个可用的节点, 直接请求并返回结果。**如果没有可用的节点,则直接抛出异常。不会做负载均衡。

代码(最简单的实现,即循环遍历):

@Overridepublic Result doInvoke(Invocation invocation, List
> invokers, LoadBalance loadbalance) throws RpcException { for (Invoker
invoker : invokers) { if (invoker.isAvailable()) { return invoker.invoke(invocation); } } throw new RpcException("No provider available in " + invokers);}

二. 路由的实现

RouterFactory是一个SPI接口,没有设定默认值,来看下他的构造:

@SPIpublic interface RouterFactory {      	    @Adaptive("protocol")    //根据URL中的protocol参数确定要初始化哪一个具体的Router实现     Router getRouter(URL url);}

其实现有:

在这里插入图片描述
本文章主要从三个实现类来进行说明:

  • ConditionRouterFactory
  • FileRouterFactory
  • ScriptRouterFactory

RouterFactory的实现类基本上都非常简单,就是直接new出一个对应的路由类并返回(除了FileRouterFactory),例如:

public class ConditionRouterFactory implements RouterFactory {       public static final String NAME = "condition";    @Override    public Router getRouter(URL url) {           return new ConditionRouter(url);    }}

2.1 条件路由

参数规则

条件路由使用的协议是:condition://协议,例如:

”condition://0.0.0.0/com.fbo.BarService?category=routers&dynamic=false&rule=" + URL.encode(nhost = 10.20.153.10 => host = 10.20.153.11")
参数名称 含义
condition:// 表示路由规则的类型,支持条件路由规则和脚本路由规则, 可扩展,必填
0.0.0.0 表示对所有IP地址生效,如果只想对某个IP的生效,则填入具体IP,必填
com.fdo.BarService 表示只对指定服务生效,必填
category=routers 表示该数据为动态配置类型,必填
dynamic=false 表示该数据为持久数据,当注册方退出时,数据依然保存在注册中心,必填
enabled=true 覆盖规则是否生效,可不填,默认生效
fbrce=false 当路由结果为空时,是否强制执行,如果不强制执行,则路由结果为空的路由规则将自动失效,可不填,默认为false
runtime=false 是否在每次调用时执行路由规则,否则只在提供者地址列表 变更时预先执行并缓存结果,调用时直接从缓存中获取路由结果。如果用了参数路由,则必须设为true。需要注意设置会影响调用的性能,可不填,默认为false
priority=l 路由规则的优先级,用于排序,优先级越大越靠前执行,可不填,默认为0
URL.encode(nhost = 10.20.153.10 => host = 10.20.153.11") 表示路由规则的内容,必填

案例:

method = find* => host = 192.168.237.130
  • 所有调用find开头的方法都会被路由到IP为192.168.237.130的服务节点上。
  • =>之前的部分为消费者匹配条件,将所有参数和消费者的URL进行对比,当消费者满足匹配条件时,对该消费者执行后面的过滤规则。
  • =>之后的部分为提供者地址列表的过滤条件,消费者最终只获取过滤后的地址列表。

条件路由的实现

条件路由的实现,可以从上文的ConditionRouterFactory开始,可以看到其返回一个ConditionRouter对象,来看下他的构造函数:

public class ConditionRouter extends AbstractRouter {   	public ConditionRouter(URL url) {           this.url = url;        this.priority = url.getParameter(PRIORITY_KEY, 0);        this.force = url.getParameter(FORCE_KEY, false);        this.enabled = url.getParameter(ENABLED_KEY, true);        // 调用初始化方法        init(url.getParameterAndDecoded(RULE_KEY));    }    public void init(String rule) {           try {               if (rule == null || rule.trim().length() == 0) {                   throw new IllegalArgumentException("Illegal route rule!");            }            rule = rule.replace("consumer.", "").replace("provider.", "");            // 1.以=>为界,把规则分成两段,前面部分为whenRule,即消费者匹配条件            // ---后面部分为thenRule,即提供者地址列表的过滤条件。            int i = rule.indexOf("=>");            String whenRule = i < 0 ? null : rule.substring(0, i).trim();            String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim();            // 2.分别解析两个路由规则。调用parseRule方法。通过正则表达式不断循环匹配whenRule 和thenRule字符串            // parseRule()方法简单地来说就是:根据key-value之间的分隔符对key-value做分类            // 比如  A=B   A&B   A!=B   A,B  这4种,最终的参数会被封装成一个个MatchPair对象,也就是这里的value            // MatchPair对象有两个作用:            // 1.通配符的匹配和占位符的赋值,MatchPair对象是内部类,里面只有一个isMatch方法,用于判断是否可以匹配规则            // 2.缓存规则,存放于Set集合中,一共有俩集合,一个存放匹配的规则,一个存放不匹配的规则。            Map
when = StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap
() : parseRule(whenRule); Map
then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule); // NOTE: It should be determined on the business level whether the `When condition` can be empty or not. this.whenCondition = when; this.thenCondition = then; } catch (ParseException e) { throw new IllegalStateException(e.getMessage(), e); } }}

其次,路由的主要实现肯定是看具体子类的route()方法了,该方法的主要功能是过滤出符合路由规则的Invoker列表,即做具体的条件匹配判断:

@Overridepublic 
List
> route(List
> invokers, URL url, Invocation invocation) throws RpcException { // 1. 各个参数的校验,比如规则是否启动?传入的Invoker列表是否为空? if (!enabled) { return invokers; } if (CollectionUtils.isEmpty(invokers)) { return invokers; } try { // 2.如果没有任何的whenRule匹配,即没有规则匹配。 if (!matchWhen(url, invocation)) { return invokers; } List
> result = new ArrayList
>(); // 3.如果whenRule有匹配的,但是thenRule为空,即没有匹配上规则的Invoker,则返回空。 if (thenCondition == null) { logger.warn(...); return result; } for (Invoker
invoker : invokers) { // 4.遍历Invoker列表,通过thenRule找出所有符合规则的Invoker加入集合 if (matchThen(invoker.getUrl(), url)) { result.add(invoker); } } if (!result.isEmpty()) { return result; } else if (force) { logger.warn(...); return result; } } catch (Throwable t) { logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t); } return invokers;}

2.2 文件路由

文件路由则是将规则写到文件中,URL中对应的key值则是文件的路径,主要功能就是将文件中的路由脚本读取出来并做解析。

其实现在工厂类本身:

public class FileRouterFactory implements RouterFactory {   	@Override    public Router getRouter(URL url) {           try {               // 1.把类型为file的protocol替换为script类型,例如:            // file:///d:/path/to/route.js?router=script ==> script:///d:/path/to/route.js?type=js&rule=
String protocol = url.getParameter(ROUTER_KEY, ScriptRouterFactory.NAME); // 2.解析文件的后缀名,后续用于匹配的对应脚本,比如e.g., js, groovy String type = null; String path = url.getPath(); if (path != null) { int i = path.lastIndexOf('.'); if (i > 0) { type = path.substring(i + 1); } } // 3. 、读取文件 String rule = IOUtils.read(new FileReader(new File(url.getAbsolutePath()))); // FIXME: this code looks useless boolean runtime = url.getParameter(RUNTIME_KEY, false); // 4.生成路由工厂可以识别的URL,并将一些必要参数添加进去 URL script = URLBuilder.from(url) .setProtocol(protocol) .addParameter(TYPE_KEY, type) .addParameter(RUNTIME_KEY, runtime) .addParameterAndEncoded(RULE_KEY, rule) .build(); // 再次调用路由的工厂,由于前面配置了protocol为script类型,所以这里会使用脚本路由进行解析 // 具体方法的实现看2.3节 return routerFactory.getRouter(script); } catch (IOException e) { throw new IllegalStateException(e.getMessage(), e); } }}

2.3 脚本路由

public class ScriptRouter extends AbstractRouter {   	// 构造	public ScriptRouter(URL url) {           this.url = url;        this.priority = url.getParameter(PRIORITY_KEY, SCRIPT_ROUTER_DEFAULT_PRIORITY);		// 初始化脚本执行的引擎,根据脚本的类型,通过Java的ScriptEngineManager来创建不同的脚本执行器并缓存        engine = getEngine(url);        rule = getRule(url);        try {               Compilable compilable = (Compilable) engine;            function = compilable.compile(rule);        } catch (ScriptException e) {               logger.error("route error, rule has been ignored. rule: " + rule +                    ", url: " + RpcContext.getContext().getUrl(), e);        }    }	private ScriptEngine getEngine(URL url) {           String type = url.getParameter(TYPE_KEY, DEFAULT_SCRIPT_TYPE_KEY);        return ENGINES.computeIfAbsent(type, t -> {               ScriptEngine scriptEngine = new ScriptEngineManager().getEngineByName(type);            if (scriptEngine == null) {                   throw new IllegalStateException("unsupported route engine type: " + type);            }            return scriptEngine;        });    }	@Override    public 
List
> route(List
> invokers, URL url, Invocation invocation) throws RpcException { try { // 1.构造需要传入到脚本的一些参数 Bindings bindings = createBindings(invokers, invocation); if (function == null) { return invokers; } // 2. function.eval(bindings)执行脚本,用的是JDK脚本引擎 return getRoutedInvokers(function.eval(bindings)); } catch (ScriptException e) { logger.error("route error, rule has been ignored. rule: " + rule + ", method:" + invocation.getMethodName() + ", url: " + RpcContext.getContext().getUrl(), e); return invokers; } }}

三. 负载均衡的实现

大家可以回过头来看下上文中,在讲容错策略的时候,有这么一行代码:

Invoker
invoker = select(loadbalance, invocation, invokers, null);

这里其实就是通过负载均衡来获得一个节点,我们知道,负载均衡和LoadBalance类有关,但是这里却不是直接用的他的方法:

@SPI(RandomLoadBalance.NAME)// 默认是按照权重设置随机概率做负载均衡,即随机负载均衡public interface LoadBalance {           @Adaptive("loadbalance")    // 可以在URL中通过loadbalance=xx来动态指定select时的负载均衡算法    
Invoker
select(List
> invokers, URL url, Invocation invocation) throws RpcException;}

类似路由,负载均衡也有一个抽象类来进行特性的封装和方法的实现:

  • 粘滞连接
  • 可用检测
  • 避免重复调用

官方文档:粘滞连接用于有状态服务,尽可能让客户端总是向同一提供者发起调用,除非该提供者“挂了”,再连接 另一台。

粘滞连接将自动开启延迟连接,以减少长连接数。
<dubbo:protocol name=Hdubbo" sticky=“true” />

一般来说,一个负载均衡的方法逻辑大概有4步:

  1. 检查URL中是否有配置粘滞连接,如果有则使用粘滞连接的Invoker。
  2. 通过ExtensionLoader获取负载均衡的具体实现,并通过负载均衡做节点的选择。(拓展点机制)
  3. 进行节点的重新选择(过滤不可用和已经调用过的节点,最终得到所有可用的节点)。
  4. 再次通过负载均衡选出一个节点然后返回,若找不到可用节点,则返回null。

本篇文章就讲比较常见的4种负载均衡(其他的都是后来的新特性):

  • RandomLoadBalance:随机,按权重设置随机概率。
  • RoundRobinLoadBalance:轮询,按公约后的权重设置轮询比例。
  • LeastActiveLoadBalance:最少活跃调用数,如果活跃数相同则随机调用。
  • ConsistentHashLoadBalance(一致哈希):一致性Hash,相同参数的请求总是发到同一提供者。

这几种跟上述的路由策略一样, 都运用到了设计模式中的模板模式,即:抽象父类AbstractLoadBalance中已经将通用的逻辑实现完成了,但是留下了一个抽象的doSelect()方法让子类去完成。

public abstract class AbstractLoadBalance implements LoadBalance {   	@Override    public 
Invoker
select(List
> invokers, URL url, Invocation invocation) { if (CollectionUtils.isEmpty(invokers)) { return null; } if (invokers.size() == 1) { return invokers.get(0); } return doSelect(invokers, url, invocation); } protected abstract
Invoker
doSelect(List
> invokers, URL url, Invocation invocation);}

doSelect()方法的实现子类:

在这里插入图片描述

3.1 Random负载均衡

代码如下:

@Overrideprotected 
Invoker
doSelect(List
> invokers, URL url, Invocation invocation) { // Invoker列表中实例的数量 int length = invokers.size(); // 计算总权重并判断每个Invoker的权重是否一样,初始化为true boolean sameWeight = true; int[] weights = new int[length]; // 总权重 int totalWeight = 0; // 遍历整个Invoker列表,求和总权重,并且对比每个Invoker的权重,判断所有的Invoker权重是否相同 for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); // Sum totalWeight += weight; // save for later use weights[i] = totalWeight; if (sameWeight && totalWeight != weight * (i + 1)) { sameWeight = false; } } // 如果权重不同,那么首先得到偏移值 if (totalWeight > 0 && !sameWeight) { // 根据总权重计算出一个随机的偏移量 int offset = ThreadLocalRandom.current().nextInt(totalWeight); // 遍历所有的Invoker,选中第一个权重大于该偏移量的Invoker并返回 for (int i = 0; i < length; i++) { if (offset < weights[i]) { return invokers.get(i); } } } // If all invokers have the same weight value or totalWeight=0, return evenly. return invokers.get(ThreadLocalRandom.current().nextInt(length));}

3.2 RoundRobin负载均衡

其实我们可以看出来,普通的Random算法,源码上可以发现,他要求每个节点的权重都是一样的。问题来了:如果有些节点的负载均衡能力很弱怎么办?

因此实际上我们还应该根据节点的能力来进行权重的干预。权重的轮询又分为:

  • 普通权重轮询(会造成某个节点会突然被频繁选中,导致某个节点的流量暴增
  • 平滑权重轮询(在轮询时会穿插选择其他的节点,让整个服务器的选择过程比较均匀
@Overrideprotected 
Invoker
doSelect(List
> invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); // 1.初始化权重缓存map。以每个Invoker的URL为key。WeightedRoundRobin类作为其value // 同时将map保存到全局集合methodWeightMap中,其key:接口+方法名 // WeightedRoundRobin封装了每个Invoker的权重,对象中保存了三个属性 /** * protected static class WeightedRoundRobin { * private int weight;// Invoker设定的权重 * // 考虑到并发场景下某个Invoker会被 同时选中,表示该节点被所有线程选中的权重总和 * private AtomicLong current = new AtomicLong(0); * // 最后一次更新的时间,用于后续缓存超时的判断 * private long lastUpdate; * } */ ConcurrentMap
map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>()); int totalWeight = 0; long maxCurrent = Long.MIN_VALUE; long now = System.currentTimeMillis(); Invoker
selectedInvoker = null; WeightedRoundRobin selectedWRR = null; // 2.遍历所有Invoker for (Invoker
invoker : invokers) { String identifyString = invoker.getUrl().toIdentityString(); int weight = getWeight(invoker, invocation); // 将每个Invoker的数据进行填充,存储于上述的map中(一开始为null) // 同时获取每个Invoker的预热权重 WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> { WeightedRoundRobin wrr = new WeightedRoundRobin(); wrr.setWeight(weight); return wrr; }); // 如果预热权重和Invoker设置的权重不相等,则说明还在预热阶段,此时将预热权重进行替换 if (weight != weightedRoundRobin.getWeight()) { weightedRoundRobin.setWeight(weight); } // 3.紧接着进行平滑轮询,每个Invoker把权重加到自己的current属性上,并更新当前的时间 // 同时累加每个Invoker的权重到总权重中,遍历完成后选出current最大的作为最重要调用的节点 long cur = weightedRoundRobin.increaseCurrent(); weightedRoundRobin.setLastUpdate(now); if (cur > maxCurrent) { maxCurrent = cur; selectedInvoker = invoker; selectedWRR = weightedRoundRobin; } totalWeight += weight; } // 4.清除已经没有使用的缓存节点 if (invokers.size() != map.size()) { map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD); } if (selectedInvoker != null) { selectedWRR.sel(totalWeight); return selectedInvoker; } // should not happen here return invokers.get(0);}

3.3 LeastActive负载均衡

该算法会记录下每个Invoker的活跃数,每次只从活跃数最少的Invoker里选一个节点。该算法是随机算法的升级版,但是该算法需要配合ActiveLimitFilter过滤器来计算每个接口方法的活跃数(本质)。

@Overrideprotected 
Invoker
doSelect(List
> invokers, URL url, Invocation invocation) { // 1.初始化各类计数器 int length = invokers.size(); // 最小活跃数计数起 int leastActive = -1; // 总权重计数器 int leastCount = 0; // 具有相同最小活动值(leastractive)的调用程序的索引 int[] leastIndexes = new int[length]; // 权重集合 int[] weights = new int[length]; // 所有最不活跃的调用程序的预热权重之和 int totalWeight = 0; // 最不活跃的调用程序的权重 int firstWeight = 0; // 每个最不活跃的调用程序都有相同的权重值?类似于随机算法中的那个布尔变量 boolean sameWeight = true; // Filter out all the least active invokers for (int i = 0; i < length; i++) { Invoker
invoker = invokers.get(i); // Get the active number of the invoker int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Get the weight of the invoker's configuration. The default value is 100. int afterWarmup = getWeight(invoker, invocation); // save for later use weights[i] = afterWarmup; // 第一次,或者发现有更小的活跃数时 if (leastActive == -1 || active < leastActive) { // 此时不管是第一次还是因为有更小的活跃数,之前的计数都要重新开始 // 这里就是将之前的计数进行置空,因为我们只需要计算最小的活跃数 } // 当前Invoker的活跃数如果与计数相同的话,说明有n个Invoker都是最小计数 // 那么将他们全部保存到集合中,后续就从中根据权重来选择一个节点 else if (active == leastActive) { // Record the index of the least active invoker in leastIndexes order leastIndexes[leastCount++] = i; // Accumulate the total weight of the least active invoker totalWeight += afterWarmup; // If every invoker has the same weight? if (sameWeight && afterWarmup != firstWeight) { sameWeight = false; } } } // 如果只有一个Invoker则直接返回 if (leastCount == 1) { // If we got exactly one invoker having the least active value, return this invoker directly. return invokers.get(leastIndexes[0]); } // 如果权重不一样,则使用和Random负载均衡一样的权重算法找到一个Invoker并返回 if (!sameWeight && totalWeight > 0) { // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on // totalWeight. int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexes[i]; offsetWeight -= weights[leastIndex]; if (offsetWeight < 0) { return invokers.get(leastIndex); } } } // 如果权重相同,则直接随机选一个返回 return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);}

注:最少活跃的计数是如何知道的?

  1. ActiveLimitFilter中,只要进来一个请求,该方法的调用的计数就会原子性+1。
  2. 而整个Invoker的调用过程中,包含在try-catch-finally语句块中,最后的finally块中,都会将计数进行原子性的-1。那么可以通过该计数器来得到活跃数。

3.4 ConsistentHash负载均衡

ConsistentHash也就是一致哈希,可以让参数相同的请求每次都路由到相同的机器上。 可以让请求相对平均,普通的一致性Hash的示意图如下:

在这里插入图片描述

  1. 将每个服务节点散列到环形上,然后将请求的客户端散列到环上。
  2. 顺时针往前找到的第一个可用节点就是需要调用的节点。

但是这种普通的哈希有一定的局限性,也就是其散列不一定均匀,容易造成某些节点压力过大。而Dubbo框架使用了优化过的Ketama 一致性Hash,通过创建虚拟节点的方式让节点的分布更加均匀。

来看下其代码:

@Overrideprotected 
Invoker
doSelect(List
> invokers, URL url, Invocation invocation) { // 1.获得方法名 String methodName = RpcUtils.getMethodName(invocation); // 2.以接口名+方法名作为key String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName; // 3.将所有可以调用的Invoker列表进行hash int invokersHashCode = invokers.hashCode(); // 4.现在Invoker列表的Hash码和之前的不一样(第三步),说明Invoker列表已经发生了变化,则重新创建Selector ConsistentHashSelector
selector = (ConsistentHashSelector
) selectors.get(key); if (selector == null || selector.identityHashCode != invokersHashCode) { selectors.put(key, new ConsistentHashSelector
(invokers, methodName, invokersHashCode)); // 5.通过selector来获得一个调用的Invoker selector = (ConsistentHashSelector
) selectors.get(key); } return selector.select(invocation);}

再来看下核心的ConsistentHashSelector选择器,来看下他的初始化过程(构造):

  • 可以看出散列的环形是通过一个TreeMap来实现的。
  • 并且所有的真实、虚拟节点都会放到TreeMap
  • 将节点的IP做MD5操作然后作为节点的唯一标识,再对标识做Hash得到TreeMap的key。
  • 最后将可以调用的节点作为TreeMap的value
ConsistentHashSelector(List
> invokers, String methodName, int identityHashCode) { this.virtualInvokers = new TreeMap
>(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160); String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } // 遍历所有节点 for (Invoker
invoker : invokers) { // 得到每个节点的IP String address = invoker.getUrl().getAddress(); // repiicaNumber是生成的虚拟节点数量,默认160个 for (int i = 0; i < replicaNumber / 4; i++) { // 以IP+递增数字做MD5,以此作为节点标识 byte[] digest = Bytes.getMD5(address + i); for (int h = 0; h < 4; h++) { // 将节点的IP做MD5操作然后作为节点的唯一标识,再对标识做Hash得到TreeMap的key。 // Invoker作为value long m = hash(digest, h); virtualInvokers.put(m, invoker); } } }}

那么客户端在调用的时候,只需要对请求的参数做MD5即可。

  • 由于TreeMap是有序的树形结构,所以我们可以调用TreeMapceilingEntry()方法。
  • 用于返回一个大于或等于当前给定key的Entry从而达到顺时针往前找的效果
  • 如果找不到,则使用firstEntry返回第一个节点。

到这里文章就结束了,后期准备再学习下Dubbo的扩展点机制。加油😝

上一篇:深入理解Dubbo原理系列文章导航
下一篇:深入理解Dubbo原理系列(四)- Dubbo核心调用和通信

发表评论

最新留言

很好
[***.229.124.182]2025年03月21日 12时11分56秒