CircuitBreaker模式的Java实现

390 查看

状态转换

  • 闭->开
    在设定的时间窗口内失败次数达到阈值,由闭->开。

  • 开->半开
    在处于开的状态,对目标的调用做失败返回,进入开的时候,启动计时器,设定时间过后进入半开状态。

  • 半开->开
    进入半开状态,会启动一个计数器,记录连续成功的调用次数,超过阈值,进入闭状态。有一次失败则进入开状态,同时清零连续成功调用次数。进入开的同时启动进入半开状态的定时器。

  • 半开->闭
    进入半开状态,会启动一个计数器,记录连续成功的调用次数,超过阈值,进入闭状态,同时清零连续成功调用次数。

实现要点

  • 切到开状态启动的定时器
    这里如果使用定时线程来做的话,开的线程多,管理比较麻烦,故这里改为维护一个切换到开状态的时间,在每次方法调用,判断是开状态时,判断是否已经过了这个超时阈值,超过的话,进入半开状态。

  • 半开状态的计数器
    目前半开状态没有使用时间窗口,仅仅使用连续成功次数来计算,一旦失败,则将断路器设置为open状态。如果连续成功次数达到阈值,则进入close状态。每次进入half-open的状态时,连续成功的计数器清零。

主要代码

断路器状态

public enum CircuitBreakerState {
    CLOSED,    // working normally, calls are transparently passing through
    OPEN,      // method calls are being intercepted and CircuitBreakerExceptions are being thrown instead
    HALF_OPEN  // method calls are passing through; if another blacklisted exception is thrown, reverts back to OPEN
}

带时间窗口的计数器

/**
 * 带时间窗口的限流计数器
 * Created by patterncat on 2016/4/20.
 */
public class LimitCounter {
    private long startTime;
    private long timeIntervalInMs;
    private int maxLimit;
    private AtomicInteger currentCount;

    public LimitCounter(long timeIntervalInMs, int maxLimit) {
        super();
        this.timeIntervalInMs = timeIntervalInMs;
        this.maxLimit = maxLimit;
        startTime = System.currentTimeMillis();
        currentCount = new AtomicInteger(0);
    }


    public int incrAndGet() {
        long currentTime = System.currentTimeMillis();
        if ((startTime + timeIntervalInMs) < currentTime) {
            synchronized (this) {
                if ((startTime + timeIntervalInMs) < currentTime) {
                    startTime = currentTime;
                    currentCount.set(0);
                }
            }
        }
        return currentCount.incrementAndGet();
    }

    public boolean thresholdReached(){
        return currentCount.get() > maxLimit;
    }

    public int get(){
        return currentCount.get();
    }

    public /*synchronized*/ void reset(){
        currentCount.set(0);
    }
}

主要配置

public class CircuitBreakerConfig {

    //closed状态的失败次数阈值
    private int failThreshold = 5;

    //closed状态的失败计数的时间窗口
    private int failCountWindowInMs = 60*1000;

    //处于open状态下进入half-open的超时时间
    private int open2HalfOpenTimeoutInMs = 5*1000;

    //half-open状态下成功次数阈值
    private int consecutiveSuccThreshold = 5;

    private CircuitBreakerConfig(){

    }

    public static CircuitBreakerConfig newDefault(){
        CircuitBreakerConfig config = new CircuitBreakerConfig();
        return config;
    }

    public int getFailThreshold() {
        return failThreshold;
    }

    public void setFailThreshold(int failThreshold) {
        this.failThreshold = failThreshold;
    }

    public int getFailCountWindowInMs() {
        return failCountWindowInMs;
    }

    public void setFailCountWindowInMs(int failCountWindowInMs) {
        this.failCountWindowInMs = failCountWindowInMs;
    }

    public int getOpen2HalfOpenTimeoutInMs() {
        return open2HalfOpenTimeoutInMs;
    }

    public void setOpen2HalfOpenTimeoutInMs(int open2HalfOpenTimeoutInMs) {
        this.open2HalfOpenTimeoutInMs = open2HalfOpenTimeoutInMs;
    }

    public int getConsecutiveSuccThreshold() {
        return consecutiveSuccThreshold;
    }

    public void setConsecutiveSuccThreshold(int consecutiveSuccThreshold) {
        this.consecutiveSuccThreshold = consecutiveSuccThreshold;
    }
}

断路器

public class CircuitBreaker {

    private static final Logger logger = LoggerFactory.getLogger(CircuitBreaker.class);

    private String name;

    private CircuitBreakerConfig config;

    private volatile CircuitBreakerState state = CircuitBreakerState.CLOSED;

    //最近进入open状态的时间
    private volatile long lastOpenedTime;

    //closed状态下失败次数
    private LimitCounter failCount ;

    //half-open状态的连续成功次数,失败立即清零
    private AtomicInteger consecutiveSuccCount = new AtomicInteger(0);


    //构造器
    public CircuitBreaker(String name,CircuitBreakerConfig config) {
        this.config = config;
        this.name = name;
        failCount = new LimitCounter(config.getFailCountWindowInMs(),config.getFailThreshold());
    }

    //状态判断
    public boolean isOpen(){
        return CircuitBreakerState.OPEN == state;
    }

    public boolean isHalfOpen(){
        return CircuitBreakerState.HALF_OPEN == state;
    }

    public boolean isClosed(){
        return CircuitBreakerState.CLOSED == state;
    }

    //状态操作

    /**
     * closed->open | halfopen -> open
     */
    public void open(){
        lastOpenedTime = System.currentTimeMillis();
        state = CircuitBreakerState.OPEN;
        logger.debug("circuit open,key:{}",name);
    }

    /**
     * open -> halfopen
     */
    public void openHalf(){
        consecutiveSuccCount.set(0);
        state = CircuitBreakerState.HALF_OPEN;
        logger.debug("circuit open-half,key:{}",name);
    }

    /**
     * halfopen -> close
     */
    public void close(){
        failCount.reset();
        state = CircuitBreakerState.CLOSED;
        logger.debug("circuit close,key:{}",name);
    }

    //阈值判断

    /**
     * 是否应该转到half open
     * 前提是 open state
     * @return
     */
    public boolean isOpen2HalfOpenTimeout(){
        return System.currentTimeMillis() - config.getOpen2HalfOpenTimeoutInMs() > lastOpenedTime;
    }

    /**
     * 是否应该从close转到open
     * @return
     */
    public boolean isCloseFailThresholdReached(){
        return failCount.thresholdReached();
    }

    /**
     * half-open状态下是否达到close的阈值
     * @return
     */
    public boolean isConsecutiveSuccessThresholdReached(){
        return consecutiveSuccCount.get() >= config.getConsecutiveSuccThreshold();
    }

    //getter
    public void incrFailCount() {
        int count = failCount.incrAndGet();
        logger.debug("incr fail count:{},key:{}",count,name);
    }

    public AtomicInteger getConsecutiveSuccCount() {
        return consecutiveSuccCount;
    }

    public CircuitBreakerState getState() {
        return state;
    }
}

断路器维护的变量

    //最近进入open状态的时间
    private volatile long lastOpenedTime;

    //closed状态下失败次数
    private LimitCounter failCount ;

    //half-open状态的连续成功次数,失败立即清零
    private AtomicInteger consecutiveSuccCount = new AtomicInteger(0);

基于jdk代理的拦截

public class CircuitBreakerInvocationHandler implements InvocationHandler{

    private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerInvocationHandler.class);

    private Object target;

    public CircuitBreakerInvocationHandler(Object target) {
        this.target = target;
    }

    //动态生成代理对象
    public Object proxy(){
        return Proxy.newProxyInstance(this.target.getClass().getClassLoader(), this.target.getClass().getInterfaces(), this);
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        GuardByCircuitBreaker breakerAnno = method.getAnnotation(GuardByCircuitBreaker.class);
        if(breakerAnno == null){
            return method.invoke(target,args);
        }
        Class<? extends Throwable>[] noTripExs = breakerAnno.noTripExceptions();
        int timeout = breakerAnno.timeoutInMs();
        int interval = breakerAnno.failCountWindowInMs();
        int failThreshold = breakerAnno.failThreshold();
        CircuitBreakerConfig cfg = CircuitBreakerConfig.newDefault();
        if(interval != -1){
            cfg.setFailCountWindowInMs(interval);
        }
        if(failThreshold != -1){
            cfg.setFailThreshold(failThreshold);
        }

        String key = target.getClass().getSimpleName() + method.getName();
        CircuitBreaker breaker = CircuitBreakerRegister.get(key);
        if(breaker == null){
            breaker = new CircuitBreaker(key,cfg);
            CircuitBreakerRegister.putIfAbsent(key,breaker);
        }

        Object returnValue = null;

        logger.debug("breaker state:{},method:{}",breaker.getState(),method.toGenericString());
        //breaker state
        if(breaker.isOpen()){
            //判断是否该进入half open状态
            if(breaker.isOpen2HalfOpenTimeout()){
                //进入half open状态
                breaker.openHalf();
                logger.debug("method:{} into half open",method.toGenericString());
                returnValue = processHalfOpen(breaker,method,args,noTripExs);
            }else{
                throw new CircuitBreakerOpenException(method.toGenericString());
            }
        }else if(breaker.isClosed()){
            try{
                returnValue = method.invoke(target,args);
//                这里看情况是否重置标志
//                breaker.close();
            }catch (Throwable t){
                if(isNoTripException(t,noTripExs)){
                    throw t;
                }else{
                    //增加计数
                    breaker.incrFailCount();
                    if(breaker.isCloseFailThresholdReached()){
                        //触发阈值,打开
                        logger.debug("method:{} reached fail threshold, circuit breaker open",method.toGenericString());
                        breaker.open();
                        throw new CircuitBreakerOpenException(method.toGenericString());
                    }else{
                        throw t;
                    }
                }
            }

        }else if(breaker.isHalfOpen()){
            returnValue = processHalfOpen(breaker,method,args,noTripExs);
        }

        return returnValue;
    }

    private Object processHalfOpen(CircuitBreaker breaker,Method method, Object[] args,Class<? extends Throwable>[] noTripExs) throws Throwable {
        try{
            Object returnValue = method.invoke(target,args);
            breaker.getConsecutiveSuccCount().incrementAndGet();
            if(breaker.isConsecutiveSuccessThresholdReached()){
                //调用成功则进入close状态
                breaker.close();
            }
            return returnValue;
        }catch (Throwable t){
            if(isNoTripException(t,noTripExs)){
                breaker.getConsecutiveSuccCount().incrementAndGet();
                if(breaker.isConsecutiveSuccessThresholdReached()){
                    breaker.close();
                }
                throw t;
            }else{
                breaker.open();
                throw new CircuitBreakerOpenException(method.toGenericString(), t);
            }
        }
    }

    private boolean isNoTripException(Throwable t,Class<? extends Throwable>[] noTripExceptions){
        if(noTripExceptions == null || noTripExceptions.length == 0){
            return false;
        }
        for(Class<? extends Throwable> ex:noTripExceptions){
            //是否是抛出异常t的父类
            //t java.lang.reflect.InvocationTargetException
            if(ex.isAssignableFrom(t.getCause().getClass())){
                return true;
            }
        }
        return false;
    }
}

github工程circuit-breaker

参考