本文将详细分析< dubbo:service executes=”“/>与< dubbo:reference actives = “”/>的实现机制,深入探讨Dubbo自身的保护机制。

1、源码分析ExecuteLimitFilter

@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY )


过滤器作用

服务调用方并发度控制。

使用场景

对Dubbo服务提供者实现的一种保护机制,控制每个服务的最大并发度。

阻断条件

当服务调用超过允许的并发度后,直接抛出RpcException异常。

接下来源码分析ExecuteLimitFilter的实现细节。

ExecuteLimitFilter#invoke


代码@1:从服务提供者列表中获取参数executes的值,如果该值小于等于0,表示不启用并发度控制,直接沿着调用链进行调用。

代码@2:根据服务提供者url和服务调用方法名,获取RpcStatus。


这里是并发容器ConcurrentHashMap的经典使用,从 这里可以看出ConcurrentMap< String, ConcurrentMap< String, RpcStatus>> METHOD_STATISTICS的存储结构为 { 服务提供者URL唯一字符串:{方法名:RpcStatus} }。

代码@3:根据服务提供者配置的最大并发度,创建该服务该方法对应的信号量对象。


使用了双重检测来创建executesLimit 信号量。

代码@4:如果获取不到锁,并不会阻塞等待,而是直接抛出RpcException,服务端的策略是快速抛出异常,供服务调用方(消费者)根据集群策略进行执行,例如重试其他服务提供者。

代码@5:执行真实的服务调用。

代码@6:如果成功申请到信号量,在服务调用结束后,释放信号量。

总结:< dubbo:service executes=”“/>的含义是,针对每个服务每个方法的最大并发度。如果超过该值,则直接抛出RpcException。


2、源码分析ActiveLimitFilter

@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY )


过滤器作用

消费端调用服务的并发控制。

使用场景

控制同一个消费端对服务端某一服务的并发调用度,通常该值应该小于< dubbo:service executes=”“/>

阻断条件

非阻断,但如果超过允许的并发度会阻塞,超过超时时间后将不再调用服务,而是直接抛出超时。

源码分析ActiveLimitFilter的实现原理:

ActiveLimitFilter#invoke

代码@1:从Invoker中获取消息端URL中的配置的actives参数,为什么从Invoker中获取的Url是消费端的Url呢?这是因为在消费端根据服务提供者URL创建调用Invoker时,会用服务提供者URL,然后合并消费端的配置属性,其优先级 -D > 消费端 > 服务端。其代码位于:、

RegistryDirectory#toInvokers

URL url = mergeUrl(providerUrl);

代码@2:根据服务提供者URL和调用服务提供者方法,获取RpcStatus。

代码@3:获取接口调用的超时时间,默认为1s。

代码@4:获取当前消费者,针对特定服务,特定方法的并发调用度,active值。

代码@5:如果当前的并发 调用大于等于允许的最大值,则针对该RpcStatus申请锁,并调用其wait(timeout)进行等待,也就是在接口调用超时时间内,还是未被唤醒,则直接抛出超时异常。

代码@6:判断被唤醒的原因是因为等待超时,还是由于调用结束,释放了”名额“,如果是超时唤醒,则直接抛出异常。

代码@7:在一次服务调用前,先将 服务名+方法名对应的RpcStatus的active加一。

代码@8:执行RPC服务调用。

代码@9:记录成功调用或失败调用,并将active减一。

代码@10:最终成功执行,如果开启了actives机制(dubbo:referecnce actives=”“)时,唤醒等待者。

总结:< dubbo:reference actives=”“/> 是控制消费端对 单个服务提供者单个服务允许调用的最大并发度。该值的取值不应该大于< dubbo:service executes=”“/>的值,并且如果消费者机器的配置,如果性能不尽相同,不建议对该值进行设置。