
本文共 27999 字,大约阅读时间需要 93 分钟。
深入理解Dubbo原理系列(五)- Dubbo集群容错、路由、负载均衡实现原理
一. Dubbo容错
1.1 Cluster层
Cluster层可以看做一个集群容错层。包含几个核心接口:
- Cluster(接口,提供Failover等容错策略)
- Directory
- Router
- LoadBalance
Cluster的整体工作流程可以分为以下几步(基本上和RPC调用的流程是一样的):
- 生成Invoker对象,(不同的Cluster生成不同的ClusterInvoker)调用其invoker方法。
- 获得可调用的服务列表。
- 负载均衡。
- RPC调用。
流程图如下:

1.2 容错机制
1.2.1 Cluster接口关系
容错的接口主要有两大类:
Cluster
:顶层接口ClusterInvoker
:负责实现容错策略部分。
Cluster
的实现类,这里称为A类,A和ClusterInvoker
是一个一对一的关系。我们知道Cluster
接口可以有多种A实现类,那么在每个A中,都要实现其join()方法,并返回一个对应类型的ClusterInvoker
实现。
以FailoverCluster
为例,来看下他的继承关系:

FailoverCluster
类: public class FailoverCluster extends AbstractCluster { public final static String NAME = "failover"; @Override publicAbstractClusterInvoker doJoin(Directory directory) throws RpcException { return new FailoverClusterInvoker<>(directory); }}
最终还是等同于实现了Cluster
接口的join()
方法
public abstract class AbstractCluster implements Cluster { @Override publicInvoker join(Directory directory) throws RpcException { return buildClusterInterceptors(doJoin(directory), directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY)); } protected abstract AbstractClusterInvoker doJoin(Directory directory) throws RpcException;}
此外,AbstractClusterInvoker是一个抽象类,封装一些通用的模板逻辑:如获取服务列表、负载均衡、调用服务提供者等。其下面有7大子类,分别实现了不同的集群容错机制:

1.2.2 容错机制概述
Cluseter的具体实现:用户可以在<dubbo :service>
、<dubbo:reference>
、<dubbo:consumer>
、<dubbo:provider>
标签上通过cluster
属性设置,其代码实现
Failover策略
Failover是Dubbo的默认容错策略,因为Cluster
接口上的@SPI注解中,引用了FailoverCluster
。当调用出现失败时,会重试其他服务器。同时,用户还可以通过retries="2"
设置重试次数。

xxxClusterInvoker
类下的doInvoke()
方法):FailoverClusterInvoker
下的doInvoke()
方法: public class FailoverClusterInvokerextends AbstractClusterInvoker { @Override @SuppressWarnings({ "unchecked", "rawtypes"}) public Result doInvoke(Invocation invocation, final List > invokers, LoadBalance loadbalance) throws RpcException { List > copyInvokers = invokers; // 1.校验传入的Invoker列表是否为空 checkInvokers(copyInvokers, invocation); String methodName = RpcUtils.getMethodName(invocation); // 2.获取参数配置,从调用的URL中获取对应的retries,即重试次数 int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // 3.初始化一些集合和对象。用于保存调用过程中出现的异常、记录调用了哪些节点 RpcException le = null; // last exception. List > invoked = new ArrayList >(copyInvokers.size()); // invoked invokers. Set providers = new HashSet (len); // 4.通过for循环完成重试,len=重试次数,成功则返回,失败则继续循环,否则抛出最后的异常 for (int i = 0; i < len; i++) { // 5.若循环次数大于1,则代表有过失败,则检验节点是否被销毁、传入的列表是否为空 if (i > 0) { checkWhetherDestroyed(); copyInvokers = list(invocation); // check again checkInvokers(copyInvokers, invocation); } // 6.调用select做负载均衡,得到要调用的节点,并记录在上述的集合中 Invoker invoker = select(loadbalance, invocation, copyInvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { // 7.远程调用 Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { //... } return result; } catch (RpcException e) { //... } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(....); }}
流程图如下:

Failfast策略
快速失败,当请求失败后,快速返回异常结果,不做任何重试。会对请求 做负载均衡,通常使用在非幕等接口的调用上。
代码:
@Overridepublic Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException { // 1.校验参数 checkInvokers(invokers, invocation); // 2.负载均衡 Invoker invoker = select(loadbalance, invocation, invokers, null); try { // 3.进行远程调用,若捕捉到异常,则封装成RpcException类进行返回。 return invoker.invoke(invocation); } catch (Throwable e) { if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception. throw (RpcException) e; } throw new RpcException(...); }}
Failsafe策略
当出现异常时,直接忽略异常。会对请求做负载均衡。既然是忽略异常,那么这种调用则适用于不关心调用是否成功的一些功能。
代码:
@Overridepublic Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException { try { // 1.检查 checkInvokers(invokers, invocation); // 2.负载均衡 Invoker invoker = select(loadbalance, invocation, invokers, null); // 3.调用,若出现异常,则返回一个空的结果 return invoker.invoke(invocation); } catch (Throwable e) { logger.error("Failsafe ignore exception: " + e.getMessage(), e); return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore }}
Fallback策略
**请求失败后,会自动记录在失败队列中,并由一个定时线程池定时重试,适用于一些异步或最终一致性的请求。**请求会做负载均衡。
代码:
@Overrideprotected Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException { Invoker invoker = null; try { // 1.检查 checkInvokers(invokers, invocation); // 2.负载均衡 invoker = select(loadbalance, invocation, invokers, null); // 3.远程调用 return invoker.invoke(invocation); } catch (Throwable e) { // 4.若出现异常,将invocation保存到重试集合addFailed中,并返回一个空的结果集。 // 定时线程池将addFailed集合中的失败请求拿出来进行重新请求,若成功则从集合中移除。 addFailed(loadbalance, invocation, invokers, invoker); return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore }}
Forking策略
同时调用多个相同的服务,只要其中一个返回,则立即返回结果。若所有请求都失败时,Forking才算失败
代码:
@Override@SuppressWarnings({ "unchecked", "rawtypes"})public Result doInvoke(final Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException { try { // 1.校验参数是否可用 checkInvokers(invokers, invocation); // 初始化一个集合,用于保存真正要调用的Invoker列表。 final List > selected; // 用户设置最大的并行数:forks final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS); final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); if (forks <= 0 || forks >= invokers.size()) { selected = invokers; } else { // 2.获得最重要调用的Invoker列表 selected = new ArrayList<>(forks); // selected.size()=实际可以调用的最大服务数,若 <用户设置最大的并行数 则说明可用的服务数小于用户的设置,因此最终要调用的 invoker只能有selected.size()个 while (selected.size() < forks) { 循环调用负载均衡方法,不断得到可调用的invoker,加入到第一步中的invoker列表中。 invoker invoker = select(loadbalance, invocation, invokers, selected); if (!selected.contains(invoker)) { //Avoid add the same invoker several times. selected.add(invoker); } } } // 3.调用前的准备工作。设置要调用的Invoker列表到RPC上下文 // 同时初始化一个异常计数器和一个阻塞队列,用于记录并行调用的结果 RpcContext.getContext().setInvokers((List) selected); final AtomicInteger count = new AtomicInteger(); final BlockingQueue 用户设置最大的并行数>
流程图如下:

Broadcast策略
广播调用所有可用的服务,任意一个节点报错则报错。 不做负载均衡。
代码:
@Override@SuppressWarnings({ "unchecked", "rawtypes"})public Result doInvoke(final Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException { // 1.校验参数并初始化一些对象,用于保存调用过程中产生的异常和结果信息等。 checkInvokers(invokers, invocation); RpcContext.getContext().setInvokers((List) invokers); RpcException exception = null; Result result = null; // 2.循环遍历所有Invoker,直接做RPC调用。 for (Invoker invoker : invokers) { try { result = invoker.invoke(invocation); } catch (RpcException e) { // 若出错,则先进行日志的打印,只有最后才会抛出异常,最终会抛出最后一个节点的异常(异常会覆盖) exception = e; logger.warn(e.getMessage(), e); } catch (Throwable e) { exception = new RpcException(e.getMessage(), e); logger.warn(e.getMessage(), e); } } if (exception != null) { throw exception; } return result;}
Available策略
**遍历所有服务列表,找到第一个可用的节点, 直接请求并返回结果。**如果没有可用的节点,则直接抛出异常。不会做负载均衡。
代码(最简单的实现,即循环遍历):
@Overridepublic Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException { for (Invoker invoker : invokers) { if (invoker.isAvailable()) { return invoker.invoke(invocation); } } throw new RpcException("No provider available in " + invokers);}
二. 路由的实现
RouterFactory
是一个SPI接口,没有设定默认值,来看下他的构造:
@SPIpublic interface RouterFactory { @Adaptive("protocol") //根据URL中的protocol参数确定要初始化哪一个具体的Router实现 Router getRouter(URL url);}
其实现有:

ConditionRouterFactory
FileRouterFactory
ScriptRouterFactory
RouterFactory
的实现类基本上都非常简单,就是直接new出一个对应的路由类并返回(除了FileRouterFactory
),例如:
public class ConditionRouterFactory implements RouterFactory { public static final String NAME = "condition"; @Override public Router getRouter(URL url) { return new ConditionRouter(url); }}
2.1 条件路由
参数规则
条件路由使用的协议是:condition://协议
,例如:
”condition://0.0.0.0/com.fbo.BarService?category=routers&dynamic=false&rule=" + URL.encode(nhost = 10.20.153.10 => host = 10.20.153.11")
参数名称 | 含义 |
---|---|
condition:// | 表示路由规则的类型,支持条件路由规则和脚本路由规则, 可扩展,必填 |
0.0.0.0 | 表示对所有IP地址生效,如果只想对某个IP的生效,则填入具体IP,必填 |
com.fdo.BarService | 表示只对指定服务生效,必填 |
category=routers | 表示该数据为动态配置类型,必填 |
dynamic=false | 表示该数据为持久数据,当注册方退出时,数据依然保存在注册中心,必填 |
enabled=true | 覆盖规则是否生效,可不填,默认生效 |
fbrce=false | 当路由结果为空时,是否强制执行,如果不强制执行,则路由结果为空的路由规则将自动失效,可不填,默认为false |
runtime=false | 是否在每次调用时执行路由规则,否则只在提供者地址列表 变更时预先执行并缓存结果,调用时直接从缓存中获取路由结果。如果用了参数路由,则必须设为true。需要注意设置会影响调用的性能,可不填,默认为false |
priority=l | 路由规则的优先级,用于排序,优先级越大越靠前执行,可不填,默认为0 |
URL.encode(nhost = 10.20.153.10 => host = 10.20.153.11") | 表示路由规则的内容,必填 |
案例:
method = find* => host = 192.168.237.130
- 所有调用
find
开头的方法都会被路由到IP为192.168.237.130的服务节点上。 =>
之前的部分为消费者匹配条件,将所有参数和消费者的URL进行对比,当消费者满足匹配条件时,对该消费者执行后面的过滤规则。=>
之后的部分为提供者地址列表的过滤条件,消费者最终只获取过滤后的地址列表。
条件路由的实现
条件路由的实现,可以从上文的ConditionRouterFactory
开始,可以看到其返回一个ConditionRouter
对象,来看下他的构造函数:
public class ConditionRouter extends AbstractRouter { public ConditionRouter(URL url) { this.url = url; this.priority = url.getParameter(PRIORITY_KEY, 0); this.force = url.getParameter(FORCE_KEY, false); this.enabled = url.getParameter(ENABLED_KEY, true); // 调用初始化方法 init(url.getParameterAndDecoded(RULE_KEY)); } public void init(String rule) { try { if (rule == null || rule.trim().length() == 0) { throw new IllegalArgumentException("Illegal route rule!"); } rule = rule.replace("consumer.", "").replace("provider.", ""); // 1.以=>为界,把规则分成两段,前面部分为whenRule,即消费者匹配条件 // ---后面部分为thenRule,即提供者地址列表的过滤条件。 int i = rule.indexOf("=>"); String whenRule = i < 0 ? null : rule.substring(0, i).trim(); String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim(); // 2.分别解析两个路由规则。调用parseRule方法。通过正则表达式不断循环匹配whenRule 和thenRule字符串 // parseRule()方法简单地来说就是:根据key-value之间的分隔符对key-value做分类 // 比如 A=B A&B A!=B A,B 这4种,最终的参数会被封装成一个个MatchPair对象,也就是这里的value // MatchPair对象有两个作用: // 1.通配符的匹配和占位符的赋值,MatchPair对象是内部类,里面只有一个isMatch方法,用于判断是否可以匹配规则 // 2.缓存规则,存放于Set集合中,一共有俩集合,一个存放匹配的规则,一个存放不匹配的规则。 Mapwhen = StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap () : parseRule(whenRule); Map then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule); // NOTE: It should be determined on the business level whether the `When condition` can be empty or not. this.whenCondition = when; this.thenCondition = then; } catch (ParseException e) { throw new IllegalStateException(e.getMessage(), e); } }}
其次,路由的主要实现肯定是看具体子类的route()
方法了,该方法的主要功能是过滤出符合路由规则的Invoker列表,即做具体的条件匹配判断:
@OverridepublicList > route(List > invokers, URL url, Invocation invocation) throws RpcException { // 1. 各个参数的校验,比如规则是否启动?传入的Invoker列表是否为空? if (!enabled) { return invokers; } if (CollectionUtils.isEmpty(invokers)) { return invokers; } try { // 2.如果没有任何的whenRule匹配,即没有规则匹配。 if (!matchWhen(url, invocation)) { return invokers; } List > result = new ArrayList >(); // 3.如果whenRule有匹配的,但是thenRule为空,即没有匹配上规则的Invoker,则返回空。 if (thenCondition == null) { logger.warn(...); return result; } for (Invoker invoker : invokers) { // 4.遍历Invoker列表,通过thenRule找出所有符合规则的Invoker加入集合 if (matchThen(invoker.getUrl(), url)) { result.add(invoker); } } if (!result.isEmpty()) { return result; } else if (force) { logger.warn(...); return result; } } catch (Throwable t) { logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t); } return invokers;}
2.2 文件路由
文件路由则是将规则写到文件中,URL中对应的key值则是文件的路径,主要功能就是将文件中的路由脚本读取出来并做解析。
其实现在工厂类本身:
public class FileRouterFactory implements RouterFactory { @Override public Router getRouter(URL url) { try { // 1.把类型为file的protocol替换为script类型,例如: // file:///d:/path/to/route.js?router=script ==> script:///d:/path/to/route.js?type=js&rule=String protocol = url.getParameter(ROUTER_KEY, ScriptRouterFactory.NAME); // 2.解析文件的后缀名,后续用于匹配的对应脚本,比如e.g., js, groovy String type = null; String path = url.getPath(); if (path != null) { int i = path.lastIndexOf('.'); if (i > 0) { type = path.substring(i + 1); } } // 3. 、读取文件 String rule = IOUtils.read(new FileReader(new File(url.getAbsolutePath()))); // FIXME: this code looks useless boolean runtime = url.getParameter(RUNTIME_KEY, false); // 4.生成路由工厂可以识别的URL,并将一些必要参数添加进去 URL script = URLBuilder.from(url) .setProtocol(protocol) .addParameter(TYPE_KEY, type) .addParameter(RUNTIME_KEY, runtime) .addParameterAndEncoded(RULE_KEY, rule) .build(); // 再次调用路由的工厂,由于前面配置了protocol为script类型,所以这里会使用脚本路由进行解析 // 具体方法的实现看2.3节 return routerFactory.getRouter(script); } catch (IOException e) { throw new IllegalStateException(e.getMessage(), e); } }}
2.3 脚本路由
public class ScriptRouter extends AbstractRouter { // 构造 public ScriptRouter(URL url) { this.url = url; this.priority = url.getParameter(PRIORITY_KEY, SCRIPT_ROUTER_DEFAULT_PRIORITY); // 初始化脚本执行的引擎,根据脚本的类型,通过Java的ScriptEngineManager来创建不同的脚本执行器并缓存 engine = getEngine(url); rule = getRule(url); try { Compilable compilable = (Compilable) engine; function = compilable.compile(rule); } catch (ScriptException e) { logger.error("route error, rule has been ignored. rule: " + rule + ", url: " + RpcContext.getContext().getUrl(), e); } } private ScriptEngine getEngine(URL url) { String type = url.getParameter(TYPE_KEY, DEFAULT_SCRIPT_TYPE_KEY); return ENGINES.computeIfAbsent(type, t -> { ScriptEngine scriptEngine = new ScriptEngineManager().getEngineByName(type); if (scriptEngine == null) { throw new IllegalStateException("unsupported route engine type: " + type); } return scriptEngine; }); } @Override publicList > route(List > invokers, URL url, Invocation invocation) throws RpcException { try { // 1.构造需要传入到脚本的一些参数 Bindings bindings = createBindings(invokers, invocation); if (function == null) { return invokers; } // 2. function.eval(bindings)执行脚本,用的是JDK脚本引擎 return getRoutedInvokers(function.eval(bindings)); } catch (ScriptException e) { logger.error("route error, rule has been ignored. rule: " + rule + ", method:" + invocation.getMethodName() + ", url: " + RpcContext.getContext().getUrl(), e); return invokers; } }}
三. 负载均衡的实现
大家可以回过头来看下上文中,在讲容错策略的时候,有这么一行代码:
Invokerinvoker = select(loadbalance, invocation, invokers, null);
这里其实就是通过负载均衡来获得一个节点,我们知道,负载均衡和LoadBalance
类有关,但是这里却不是直接用的他的方法:
@SPI(RandomLoadBalance.NAME)// 默认是按照权重设置随机概率做负载均衡,即随机负载均衡public interface LoadBalance { @Adaptive("loadbalance") // 可以在URL中通过loadbalance=xx来动态指定select时的负载均衡算法Invoker select(List > invokers, URL url, Invocation invocation) throws RpcException;}
类似路由,负载均衡也有一个抽象类来进行特性的封装和方法的实现:
- 粘滞连接
- 可用检测
- 避免重复调用
官方文档:粘滞连接用于有状态服务,尽可能让客户端总是向同一提供者发起调用,除非该提供者“挂了”,再连接 另一台。
粘滞连接将自动开启延迟连接,以减少长连接数。 <dubbo:protocol name=Hdubbo" sticky=“true” />
一般来说,一个负载均衡的方法逻辑大概有4步:
- 检查URL中是否有配置粘滞连接,如果有则使用粘滞连接的Invoker。
- 通过
ExtensionLoader
获取负载均衡的具体实现,并通过负载均衡做节点的选择。(拓展点机制) - 进行节点的重新选择(过滤不可用和已经调用过的节点,最终得到所有可用的节点)。
- 再次通过负载均衡选出一个节点然后返回,若找不到可用节点,则返回null。
本篇文章就讲比较常见的4种负载均衡(其他的都是后来的新特性):
RandomLoadBalance
:随机,按权重设置随机概率。RoundRobinLoadBalance
:轮询,按公约后的权重设置轮询比例。LeastActiveLoadBalance
:最少活跃调用数,如果活跃数相同则随机调用。ConsistentHashLoadBalance
(一致哈希):一致性Hash,相同参数的请求总是发到同一提供者。
这几种跟上述的路由策略一样, 都运用到了设计模式中的模板模式,即:抽象父类AbstractLoadBalance
中已经将通用的逻辑实现完成了,但是留下了一个抽象的doSelect()
方法让子类去完成。
public abstract class AbstractLoadBalance implements LoadBalance { @Override publicInvoker select(List > invokers, URL url, Invocation invocation) { if (CollectionUtils.isEmpty(invokers)) { return null; } if (invokers.size() == 1) { return invokers.get(0); } return doSelect(invokers, url, invocation); } protected abstract Invoker doSelect(List > invokers, URL url, Invocation invocation);}
doSelect()
方法的实现子类:

3.1 Random负载均衡
代码如下:
@OverrideprotectedInvoker doSelect(List > invokers, URL url, Invocation invocation) { // Invoker列表中实例的数量 int length = invokers.size(); // 计算总权重并判断每个Invoker的权重是否一样,初始化为true boolean sameWeight = true; int[] weights = new int[length]; // 总权重 int totalWeight = 0; // 遍历整个Invoker列表,求和总权重,并且对比每个Invoker的权重,判断所有的Invoker权重是否相同 for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); // Sum totalWeight += weight; // save for later use weights[i] = totalWeight; if (sameWeight && totalWeight != weight * (i + 1)) { sameWeight = false; } } // 如果权重不同,那么首先得到偏移值 if (totalWeight > 0 && !sameWeight) { // 根据总权重计算出一个随机的偏移量 int offset = ThreadLocalRandom.current().nextInt(totalWeight); // 遍历所有的Invoker,选中第一个权重大于该偏移量的Invoker并返回 for (int i = 0; i < length; i++) { if (offset < weights[i]) { return invokers.get(i); } } } // If all invokers have the same weight value or totalWeight=0, return evenly. return invokers.get(ThreadLocalRandom.current().nextInt(length));}
3.2 RoundRobin负载均衡
其实我们可以看出来,普通的Random算法,源码上可以发现,他要求每个节点的权重都是一样的。问题来了:如果有些节点的负载均衡能力很弱怎么办?
因此实际上我们还应该根据节点的能力来进行权重的干预。权重的轮询又分为:
- 普通权重轮询(会造成某个节点会突然被频繁选中,导致某个节点的流量暴增)
- 平滑权重轮询(在轮询时会穿插选择其他的节点,让整个服务器的选择过程比较均匀)
@OverrideprotectedInvoker doSelect(List > invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); // 1.初始化权重缓存map。以每个Invoker的URL为key。WeightedRoundRobin类作为其value // 同时将map保存到全局集合methodWeightMap中,其key:接口+方法名 // WeightedRoundRobin封装了每个Invoker的权重,对象中保存了三个属性 /** * protected static class WeightedRoundRobin { * private int weight;// Invoker设定的权重 * // 考虑到并发场景下某个Invoker会被 同时选中,表示该节点被所有线程选中的权重总和 * private AtomicLong current = new AtomicLong(0); * // 最后一次更新的时间,用于后续缓存超时的判断 * private long lastUpdate; * } */ ConcurrentMap map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>()); int totalWeight = 0; long maxCurrent = Long.MIN_VALUE; long now = System.currentTimeMillis(); Invoker selectedInvoker = null; WeightedRoundRobin selectedWRR = null; // 2.遍历所有Invoker for (Invoker invoker : invokers) { String identifyString = invoker.getUrl().toIdentityString(); int weight = getWeight(invoker, invocation); // 将每个Invoker的数据进行填充,存储于上述的map中(一开始为null) // 同时获取每个Invoker的预热权重 WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> { WeightedRoundRobin wrr = new WeightedRoundRobin(); wrr.setWeight(weight); return wrr; }); // 如果预热权重和Invoker设置的权重不相等,则说明还在预热阶段,此时将预热权重进行替换 if (weight != weightedRoundRobin.getWeight()) { weightedRoundRobin.setWeight(weight); } // 3.紧接着进行平滑轮询,每个Invoker把权重加到自己的current属性上,并更新当前的时间 // 同时累加每个Invoker的权重到总权重中,遍历完成后选出current最大的作为最重要调用的节点 long cur = weightedRoundRobin.increaseCurrent(); weightedRoundRobin.setLastUpdate(now); if (cur > maxCurrent) { maxCurrent = cur; selectedInvoker = invoker; selectedWRR = weightedRoundRobin; } totalWeight += weight; } // 4.清除已经没有使用的缓存节点 if (invokers.size() != map.size()) { map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD); } if (selectedInvoker != null) { selectedWRR.sel(totalWeight); return selectedInvoker; } // should not happen here return invokers.get(0);}
3.3 LeastActive负载均衡
该算法会记录下每个Invoker的活跃数,每次只从活跃数最少的Invoker里选一个节点。该算法是随机算法的升级版,但是该算法需要配合ActiveLimitFilter
过滤器来计算每个接口方法的活跃数(本质)。
@OverrideprotectedInvoker doSelect(List > invokers, URL url, Invocation invocation) { // 1.初始化各类计数器 int length = invokers.size(); // 最小活跃数计数起 int leastActive = -1; // 总权重计数器 int leastCount = 0; // 具有相同最小活动值(leastractive)的调用程序的索引 int[] leastIndexes = new int[length]; // 权重集合 int[] weights = new int[length]; // 所有最不活跃的调用程序的预热权重之和 int totalWeight = 0; // 最不活跃的调用程序的权重 int firstWeight = 0; // 每个最不活跃的调用程序都有相同的权重值?类似于随机算法中的那个布尔变量 boolean sameWeight = true; // Filter out all the least active invokers for (int i = 0; i < length; i++) { Invoker invoker = invokers.get(i); // Get the active number of the invoker int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Get the weight of the invoker's configuration. The default value is 100. int afterWarmup = getWeight(invoker, invocation); // save for later use weights[i] = afterWarmup; // 第一次,或者发现有更小的活跃数时 if (leastActive == -1 || active < leastActive) { // 此时不管是第一次还是因为有更小的活跃数,之前的计数都要重新开始 // 这里就是将之前的计数进行置空,因为我们只需要计算最小的活跃数 } // 当前Invoker的活跃数如果与计数相同的话,说明有n个Invoker都是最小计数 // 那么将他们全部保存到集合中,后续就从中根据权重来选择一个节点 else if (active == leastActive) { // Record the index of the least active invoker in leastIndexes order leastIndexes[leastCount++] = i; // Accumulate the total weight of the least active invoker totalWeight += afterWarmup; // If every invoker has the same weight? if (sameWeight && afterWarmup != firstWeight) { sameWeight = false; } } } // 如果只有一个Invoker则直接返回 if (leastCount == 1) { // If we got exactly one invoker having the least active value, return this invoker directly. return invokers.get(leastIndexes[0]); } // 如果权重不一样,则使用和Random负载均衡一样的权重算法找到一个Invoker并返回 if (!sameWeight && totalWeight > 0) { // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on // totalWeight. int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexes[i]; offsetWeight -= weights[leastIndex]; if (offsetWeight < 0) { return invokers.get(leastIndex); } } } // 如果权重相同,则直接随机选一个返回 return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);}
注:最少活跃的计数是如何知道的?
- 在
ActiveLimitFilter
中,只要进来一个请求,该方法的调用的计数就会原子性+1。 - 而整个Invoker的调用过程中,包含在
try-catch-finally
语句块中,最后的finally块中,都会将计数进行原子性的-1。那么可以通过该计数器来得到活跃数。
3.4 ConsistentHash负载均衡
ConsistentHash也就是一致哈希,可以让参数相同的请求每次都路由到相同的机器上。 可以让请求相对平均,普通的一致性Hash的示意图如下:

- 将每个服务节点散列到环形上,然后将请求的客户端散列到环上。
- 顺时针往前找到的第一个可用节点就是需要调用的节点。
但是这种普通的哈希有一定的局限性,也就是其散列不一定均匀,容易造成某些节点压力过大。而Dubbo框架使用了优化过的Ketama 一致性Hash,通过创建虚拟节点的方式让节点的分布更加均匀。
来看下其代码:
@OverrideprotectedInvoker doSelect(List > invokers, URL url, Invocation invocation) { // 1.获得方法名 String methodName = RpcUtils.getMethodName(invocation); // 2.以接口名+方法名作为key String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName; // 3.将所有可以调用的Invoker列表进行hash int invokersHashCode = invokers.hashCode(); // 4.现在Invoker列表的Hash码和之前的不一样(第三步),说明Invoker列表已经发生了变化,则重新创建Selector ConsistentHashSelector selector = (ConsistentHashSelector ) selectors.get(key); if (selector == null || selector.identityHashCode != invokersHashCode) { selectors.put(key, new ConsistentHashSelector (invokers, methodName, invokersHashCode)); // 5.通过selector来获得一个调用的Invoker selector = (ConsistentHashSelector ) selectors.get(key); } return selector.select(invocation);}
再来看下核心的ConsistentHashSelector
选择器,来看下他的初始化过程(构造):
- 可以看出散列的环形是通过一个
TreeMap
来实现的。 - 并且所有的真实、虚拟节点都会放到
TreeMap
中 - 将节点的IP做MD5操作然后作为节点的唯一标识,再对标识做Hash得到
TreeMap
的key。 - 最后将可以调用的节点作为
TreeMap
的value
ConsistentHashSelector(List> invokers, String methodName, int identityHashCode) { this.virtualInvokers = new TreeMap >(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160); String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } // 遍历所有节点 for (Invoker invoker : invokers) { // 得到每个节点的IP String address = invoker.getUrl().getAddress(); // repiicaNumber是生成的虚拟节点数量,默认160个 for (int i = 0; i < replicaNumber / 4; i++) { // 以IP+递增数字做MD5,以此作为节点标识 byte[] digest = Bytes.getMD5(address + i); for (int h = 0; h < 4; h++) { // 将节点的IP做MD5操作然后作为节点的唯一标识,再对标识做Hash得到TreeMap的key。 // Invoker作为value long m = hash(digest, h); virtualInvokers.put(m, invoker); } } }}
那么客户端在调用的时候,只需要对请求的参数做MD5即可。
- 由于
TreeMap
是有序的树形结构,所以我们可以调用TreeMap
的ceilingEntry()
方法。 - 用于返回一个大于或等于当前给定key的
Entry
。从而达到顺时针往前找的效果。 - 如果找不到,则使用firstEntry返回第一个节点。
到这里文章就结束了,后期准备再学习下Dubbo的扩展点机制。加油😝
发表评论
最新留言
关于作者
