资讯专栏INFORMATION COLUMN

CircuitBreaker模式的Java实现

animabear / 475人阅读

摘要:序状态转换闭开在设定的时间窗口内失败次数达到阈值,由闭开。进入开的同时启动进入半开状态的定时器。半开状态的计数器目前半开状态没有使用时间窗口,仅仅使用连续成功次数来计算,一旦失败,则将断路器设置为状态。

状态转换

闭->开

在设定的时间窗口内失败次数达到阈值,由闭->开。

开->半开

在处于开的状态,对目标的调用做失败返回,进入开的时候,启动计时器,设定时间过后进入半开状态。

半开->开

进入半开状态,会启动一个计数器,记录连续成功的调用次数,超过阈值,进入闭状态。有一次失败则进入开状态,同时清零连续成功调用次数。进入开的同时启动进入半开状态的定时器。

半开->闭

进入半开状态,会启动一个计数器,记录连续成功的调用次数,超过阈值,进入闭状态,同时清零连续成功调用次数。

实现要点

切到开状态启动的定时器

这里如果使用定时线程来做的话,开的线程多,管理比较麻烦,故这里改为维护一个切换到开状态的时间,在每次方法调用,判断是开状态时,判断是否已经过了这个超时阈值,超过的话,进入半开状态。

半开状态的计数器

目前半开状态没有使用时间窗口,仅仅使用连续成功次数来计算,一旦失败,则将断路器设置为open状态。如果连续成功次数达到阈值,则进入close状态。每次进入half-open的状态时,连续成功的计数器清零。

主要代码 断路器状态
public enum CircuitBreakerState {
    CLOSED,    // working normally, calls are transparently passing through
    OPEN,      // method calls are being intercepted and CircuitBreakerExceptions are being thrown instead
    HALF_OPEN  // method calls are passing through; if another blacklisted exception is thrown, reverts back to OPEN
}
带时间窗口的计数器
/**
 * 带时间窗口的限流计数器
 */
public class LimitCounter {
    private long startTime;
    private long timeIntervalInMs;
    private int maxLimit;
    private AtomicInteger currentCount;

    public LimitCounter(long timeIntervalInMs, int maxLimit) {
        super();
        this.timeIntervalInMs = timeIntervalInMs;
        this.maxLimit = maxLimit;
        startTime = System.currentTimeMillis();
        currentCount = new AtomicInteger(0);
    }


    public int incrAndGet() {
        long currentTime = System.currentTimeMillis();
        if ((startTime + timeIntervalInMs) < currentTime) {
            synchronized (this) {
                if ((startTime + timeIntervalInMs) < currentTime) {
                    startTime = currentTime;
                    currentCount.set(0);
                }
            }
        }
        return currentCount.incrementAndGet();
    }

    public boolean thresholdReached(){
        return currentCount.get() > maxLimit;
    }

    public int get(){
        return currentCount.get();
    }

    public /*synchronized*/ void reset(){
        currentCount.set(0);
    }
}
主要配置
public class CircuitBreakerConfig {

    //closed状态的失败次数阈值
    private int failThreshold = 5;

    //closed状态的失败计数的时间窗口
    private int failCountWindowInMs = 60*1000;

    //处于open状态下进入half-open的超时时间
    private int open2HalfOpenTimeoutInMs = 5*1000;

    //half-open状态下成功次数阈值
    private int consecutiveSuccThreshold = 5;

    private CircuitBreakerConfig(){

    }

    public static CircuitBreakerConfig newDefault(){
        CircuitBreakerConfig config = new CircuitBreakerConfig();
        return config;
    }

    public int getFailThreshold() {
        return failThreshold;
    }

    public void setFailThreshold(int failThreshold) {
        this.failThreshold = failThreshold;
    }

    public int getFailCountWindowInMs() {
        return failCountWindowInMs;
    }

    public void setFailCountWindowInMs(int failCountWindowInMs) {
        this.failCountWindowInMs = failCountWindowInMs;
    }

    public int getOpen2HalfOpenTimeoutInMs() {
        return open2HalfOpenTimeoutInMs;
    }

    public void setOpen2HalfOpenTimeoutInMs(int open2HalfOpenTimeoutInMs) {
        this.open2HalfOpenTimeoutInMs = open2HalfOpenTimeoutInMs;
    }

    public int getConsecutiveSuccThreshold() {
        return consecutiveSuccThreshold;
    }

    public void setConsecutiveSuccThreshold(int consecutiveSuccThreshold) {
        this.consecutiveSuccThreshold = consecutiveSuccThreshold;
    }
}
断路器
public class CircuitBreaker {

    private static final Logger logger = LoggerFactory.getLogger(CircuitBreaker.class);

    private String name;

    private CircuitBreakerConfig config;

    private volatile CircuitBreakerState state = CircuitBreakerState.CLOSED;

    //最近进入open状态的时间
    private volatile long lastOpenedTime;

    //closed状态下失败次数
    private LimitCounter failCount ;

    //half-open状态的连续成功次数,失败立即清零
    private AtomicInteger consecutiveSuccCount = new AtomicInteger(0);


    //构造器
    public CircuitBreaker(String name,CircuitBreakerConfig config) {
        this.config = config;
        this.name = name;
        failCount = new LimitCounter(config.getFailCountWindowInMs(),config.getFailThreshold());
    }

    //状态判断
    public boolean isOpen(){
        return CircuitBreakerState.OPEN == state;
    }

    public boolean isHalfOpen(){
        return CircuitBreakerState.HALF_OPEN == state;
    }

    public boolean isClosed(){
        return CircuitBreakerState.CLOSED == state;
    }

    //状态操作

    /**
     * closed->open | halfopen -> open
     */
    public void open(){
        lastOpenedTime = System.currentTimeMillis();
        state = CircuitBreakerState.OPEN;
        logger.debug("circuit open,key:{}",name);
    }

    /**
     * open -> halfopen
     */
    public void openHalf(){
        consecutiveSuccCount.set(0);
        state = CircuitBreakerState.HALF_OPEN;
        logger.debug("circuit open-half,key:{}",name);
    }

    /**
     * halfopen -> close
     */
    public void close(){
        failCount.reset();
        state = CircuitBreakerState.CLOSED;
        logger.debug("circuit close,key:{}",name);
    }

    //阈值判断

    /**
     * 是否应该转到half open
     * 前提是 open state
     * @return
     */
    public boolean isOpen2HalfOpenTimeout(){
        return System.currentTimeMillis() - config.getOpen2HalfOpenTimeoutInMs() > lastOpenedTime;
    }

    /**
     * 是否应该从close转到open
     * @return
     */
    public boolean isCloseFailThresholdReached(){
        return failCount.thresholdReached();
    }

    /**
     * half-open状态下是否达到close的阈值
     * @return
     */
    public boolean isConsecutiveSuccessThresholdReached(){
        return consecutiveSuccCount.get() >= config.getConsecutiveSuccThreshold();
    }

    //getter
    public void incrFailCount() {
        int count = failCount.incrAndGet();
        logger.debug("incr fail count:{},key:{}",count,name);
    }

    public AtomicInteger getConsecutiveSuccCount() {
        return consecutiveSuccCount;
    }

    public CircuitBreakerState getState() {
        return state;
    }
}
断路器维护的变量
    //最近进入open状态的时间
    private volatile long lastOpenedTime;

    //closed状态下失败次数
    private LimitCounter failCount ;

    //half-open状态的连续成功次数,失败立即清零
    private AtomicInteger consecutiveSuccCount = new AtomicInteger(0);
基于jdk代理的拦截
public class CircuitBreakerInvocationHandler implements InvocationHandler{

    private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerInvocationHandler.class);

    private Object target;

    public CircuitBreakerInvocationHandler(Object target) {
        this.target = target;
    }

    //动态生成代理对象
    public Object proxy(){
        return Proxy.newProxyInstance(this.target.getClass().getClassLoader(), this.target.getClass().getInterfaces(), this);
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        GuardByCircuitBreaker breakerAnno = method.getAnnotation(GuardByCircuitBreaker.class);
        if(breakerAnno == null){
            return method.invoke(target,args);
        }
        Class[] noTripExs = breakerAnno.noTripExceptions();
        int timeout = breakerAnno.timeoutInMs();
        int interval = breakerAnno.failCountWindowInMs();
        int failThreshold = breakerAnno.failThreshold();
        CircuitBreakerConfig cfg = CircuitBreakerConfig.newDefault();
        if(interval != -1){
            cfg.setFailCountWindowInMs(interval);
        }
        if(failThreshold != -1){
            cfg.setFailThreshold(failThreshold);
        }

        String key = target.getClass().getSimpleName() + method.getName();
        CircuitBreaker breaker = CircuitBreakerRegister.get(key);
        if(breaker == null){
            breaker = new CircuitBreaker(key,cfg);
            CircuitBreakerRegister.putIfAbsent(key,breaker);
        }

        Object returnValue = null;

        logger.debug("breaker state:{},method:{}",breaker.getState(),method.toGenericString());
        //breaker state
        if(breaker.isOpen()){
            //判断是否该进入half open状态
            if(breaker.isOpen2HalfOpenTimeout()){
                //进入half open状态
                breaker.openHalf();
                logger.debug("method:{} into half open",method.toGenericString());
                returnValue = processHalfOpen(breaker,method,args,noTripExs);
            }else{
                throw new CircuitBreakerOpenException(method.toGenericString());
            }
        }else if(breaker.isClosed()){
            try{
                returnValue = method.invoke(target,args);
//                这里看情况是否重置标志
//                breaker.close();
            }catch (Throwable t){
                if(isNoTripException(t,noTripExs)){
                    throw t;
                }else{
                    //增加计数
                    breaker.incrFailCount();
                    if(breaker.isCloseFailThresholdReached()){
                        //触发阈值,打开
                        logger.debug("method:{} reached fail threshold, circuit breaker open",method.toGenericString());
                        breaker.open();
                        throw new CircuitBreakerOpenException(method.toGenericString());
                    }else{
                        throw t;
                    }
                }
            }

        }else if(breaker.isHalfOpen()){
            returnValue = processHalfOpen(breaker,method,args,noTripExs);
        }

        return returnValue;
    }

    private Object processHalfOpen(CircuitBreaker breaker,Method method, Object[] args,Class[] noTripExs) throws Throwable {
        try{
            Object returnValue = method.invoke(target,args);
            breaker.getConsecutiveSuccCount().incrementAndGet();
            if(breaker.isConsecutiveSuccessThresholdReached()){
                //调用成功则进入close状态
                breaker.close();
            }
            return returnValue;
        }catch (Throwable t){
            if(isNoTripException(t,noTripExs)){
                breaker.getConsecutiveSuccCount().incrementAndGet();
                if(breaker.isConsecutiveSuccessThresholdReached()){
                    breaker.close();
                }
                throw t;
            }else{
                breaker.open();
                throw new CircuitBreakerOpenException(method.toGenericString(), t);
            }
        }
    }

    private boolean isNoTripException(Throwable t,Class[] noTripExceptions){
        if(noTripExceptions == null || noTripExceptions.length == 0){
            return false;
        }
        for(Class ex:noTripExceptions){
            //是否是抛出异常t的父类
            //t java.lang.reflect.InvocationTargetException
            if(ex.isAssignableFrom(t.getCause().getClass())){
                return true;
            }
        }
        return false;
    }
}
github工程circuit-breaker
参考

martinfowler-CircuitBreaker

microsoft-Circuit Breaker Pattern(必读)

cloud-design-patterns-断路器模式

HystrixCircuitBreaker

熔断器设计模式(实现参考)

Creating a circuit breaker with Spring AOP(实现参考)

alenegro81/CircuitBreaker(参考jdk代理实现)

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/65827.html

相关文章

  • 聊聊ElasticsearchCircuitBreaker

    摘要:序本文主要研究一下的定义了枚举它还定义了等方法它有两个实现类分别是实现了接口,它不做任何操作实现了接口其方法会抛出方法首先判断,如果为,则执行方法如果为则调用,否则调用计算,没有抛出异常的话,则最后执序 本文主要研究一下Elasticsearch的CircuitBreaker CircuitBreaker elasticsearch-7.0.1/server/src/main/java/or...

    番茄西红柿 评论0 收藏0
  • 聊聊ElasticsearchCircuitBreaker

    摘要:序本文主要研究一下的定义了枚举它还定义了等方法它有两个实现类分别是实现了接口,它不做任何操作实现了接口其方法会抛出方法首先判断,如果为,则执行方法如果为则调用,否则调用计算,没有抛出异常的话,则最后执序 本文主要研究一下Elasticsearch的CircuitBreaker CircuitBreaker elasticsearch-7.0.1/server/src/main/java/or...

    番茄西红柿 评论0 收藏0
  • 聊聊ElasticsearchCircuitBreaker

    摘要:序本文主要研究一下的定义了枚举它还定义了等方法它有两个实现类分别是实现了接口,它不做任何操作实现了接口其方法会抛出方法首先判断,如果为,则执行方法如果为则调用,否则调用计算,没有抛出异常的话,则最后执序 本文主要研究一下Elasticsearch的CircuitBreaker CircuitBreaker elasticsearch-7.0.1/server/src/main/java/or...

    yangrd 评论0 收藏0

发表评论

0条评论

animabear

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<