资讯专栏INFORMATION COLUMN

Spring Cloud Ribbon

fasss / 1702人阅读

摘要:客户端负载均衡需要客户端自己维护自己要访问的服务实例清单,这些服务清单来源于注册中心在使用进行服务治理时。使用从负载均衡器中挑选出的服务实例来执行请求内容。

客户端负载均衡Spring Cloud Ribbon

 Spring Cloud Ribbon是一个基于HTTP和TCP的客户端负载均衡工具,基于Netflix Ribbon实现。

目录

客户端负载均衡(本文重点)

源码分析(本文重点)

负载均衡器

负载均衡策略

配置详解

自动化配置

客户端负载均衡

 负载均衡是对系统的高可用、网络压力的缓解和处理内容扩容的重要手段之一。

 负载均衡可以分为客户端负载均衡和服务端负载均衡。

 负载均衡按设备来分为硬件负载均衡和软件负载均衡,都属于服务端负载均衡。

 硬件负载均衡主要通过在服务器节点之间安装专门用于负载均衡的设备,例如F5等。

 软件负载均衡通过在服务器上安装一些具有负载均衡功能或模块的软件来完成请求的转发工作,例如Nginx等。

 硬件负载均衡和软件负载均衡都会维护一个可用的服务清单,然后通过心跳检测来剔除故障节点以保证服务清单中的节点都正常可用。当客户端发出请求时,负载均衡器会按照某种算法(线性轮询、按权重负载、按流量负载等)从服务清单中取出一台服务器的地址,然后将请求转发到该服务器上。

 客户端负载均衡需要客户端自己维护自己要访问的服务实例清单, 这些服务清单来源于注册中心(在使用Eureka进行服务治理时)。

基于Spring Cloud Ribbon实现客户端负载均衡

 基于Spring Cloud Ribbon实现客户端负载均衡非常简单,主要由以下步骤:

服务提供者需要启动多个服务实例并注册到一个或多个相关联的服务注册中心上;

服务消费者直接通过带有@LoadBalanced注解的RestTemplate向服务提供者发送请求以实现客户端的负载均衡。

源码分析

 既然Ribbon客户端负载均衡需要为RestTemplate增加@LoadBalanced注解,那么下面我们就从这个注解开始分析。

@LoadBalanced

 通过该注解的头部注释可以得知,该注解的作用是使用LoadBalancerClient来对RestTemplate进行配置,下面接着看LoadBalancerClient

LoadBalancerClient

 该类是一个接口类,代码如下:

package org.springframework.cloud.client.loadbalancer;

import org.springframework.cloud.client.ServiceInstance;

import java.io.IOException;
import java.net.URI;

public interface LoadBalancerClient extends ServiceInstanceChooser {
    
    /**
     * 注意该方法是从父类ServiceInstanceChooser接口中继承过来的
     */
    ServiceInstance choose(String serviceId);

     T execute(String serviceId, LoadBalancerRequest request) throws IOException;

     T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest request) throws IOException;

    URI reconstructURI(ServiceInstance instance, URI original);
}

 从接口的方法和注释可以看出,一个客户端负载均衡器应该具有以下几种重要功能:

ServiceInstance choose(String serviceId) : 根据传入的服务名称,从负载均衡器中挑选一个对应服务的实例。

T execute(String serviceId, LoadBalancerRequest request) throws IOException : 使用从负载均衡器中挑选出的服务实例来执行请求内容。

URI reconstructURI(ServiceInstance instance, URI original) : 构建一个符合host:port格式的URI。在分布式系统中,我们都使用逻辑上的服务名作为host来构建URI(代替服务实例的host:port形式)进行请求。在该操作的定义中,前者ServiceInstance对象是带有host和port的具体服务实例,后者URI对象则是使用逻辑服务名的URI,返回的URI是根据这两者转换后的host:port形式的URI。

 通过分析org.springframework.cloud.client.loadbalancer包中的类,可以找出org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration是实现客户端负载均衡的自动配置类

LoadBalancerAutoConfiguration

 从LoadBalancerAutoConfiguration类的头部注解上可以看出,Ribbon实现客户端负载均衡的自动配置需要满足下面两个条件:

@ConditionalOnClass(RestTemplate.class) : RestTemplate类必须存在于当前工程的环境中

@ConditionalOnBean(LoadBalancerClient.class) : 在Spring的Bean工程中必须要有LoadBalancerClient的实现Bean

 在该自动化配置的任务中,主要完成以下三个任务:

创建一个LoadBalancerInterceptor实例,用于客户端发起请求时进行拦截,进而实现客户端的负载均衡

创建一个RestTemplateCustomizer实例,用于给RestTemplate增加LoadBalancerInterceptor拦截器

维护一个被@LoadBalanced注解修饰的RestTemplate集合(List),并在这里进行初始化,通过调用RestTemplateCustomizer实例来给需要客户端负载均衡的RestTemplate增加LoadBalancerInterceptor拦截器

 通过上面可以看出,真正实现客户端负载均衡是因为有LoadBalancerInterceptor拦截器的存在,那么下面看一下LoadBalancerInterceptor类

LoadBalancerInterceptor
package org.springframework.cloud.client.loadbalancer;

import java.io.IOException;
import java.net.URI;

import org.springframework.http.HttpRequest;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.util.Assert;

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

    private LoadBalancerClient loadBalancer;
    private LoadBalancerRequestFactory requestFactory;

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
        this.loadBalancer = loadBalancer;
        this.requestFactory = requestFactory;
    }

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
        // for backwards compatibility
        this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
    }

    @Override
    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) throws IOException {
        final URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
        return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
    }
}
package org.springframework.cloud.client.loadbalancer;

import java.util.List;

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpResponse;

public class LoadBalancerRequestFactory {

    private LoadBalancerClient loadBalancer;
    private List transformers;

    public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer,
            List transformers) {
        this.loadBalancer = loadBalancer;
        this.transformers = transformers;
    }

    public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer) {
        this.loadBalancer = loadBalancer;
    }

    public LoadBalancerRequest createRequest(final HttpRequest request,
            final byte[] body, final ClientHttpRequestExecution execution) {
        return instance -> {
            HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, loadBalancer);
            if (transformers != null) {
                for (LoadBalancerRequestTransformer transformer : transformers) {
                    serviceRequest = transformer.transformRequest(serviceRequest, instance);
                }
            }
            return execution.execute(serviceRequest, body);
        };
    }
}
package org.springframework.cloud.client.loadbalancer;

import java.net.URI;

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.support.HttpRequestWrapper;

public class ServiceRequestWrapper extends HttpRequestWrapper {
    private final ServiceInstance instance;
    private final LoadBalancerClient loadBalancer;

    public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance,
                                 LoadBalancerClient loadBalancer) {
        super(request);
        this.instance = instance;
        this.loadBalancer = loadBalancer;
    }

    @Override
    public URI getURI() {
        URI uri = this.loadBalancer.reconstructURI(
                this.instance, getRequest().getURI());
        return uri;
    }
}

 拦截器的intercept方法还用到另外两个类LoadBalancerRequestFactory和ServiceRequestWrapper。

 当使用被@LoadBalanced注解的RestTemplate发送请求时,会被LoadBalancerInterceptor拦截器拦截,执行LoadBalancerInterceptor的intercept方法,最终在该方法中选择合适的服务实例通过LoadBalancerClient的execute方法进行调用。

 因为LoadBalancerClient是抽象的负载均衡器接口,下面我们可以通过一个具体的负载均衡器RibbonLoadBalancerClient来进行分析。

RibbonLoadBalancerClient
package org.springframework.cloud.netflix.ribbon;

import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.Map;

import com.netflix.client.config.IClientConfig;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.Server;

import org.springframework.cloud.client.DefaultServiceInstance;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
import org.springframework.cloud.client.loadbalancer.LoadBalancerRequest;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;

import static org.springframework.cloud.netflix.ribbon.RibbonUtils.updateToSecureConnectionIfNeeded;

public class RibbonLoadBalancerClient implements LoadBalancerClient {

    private SpringClientFactory clientFactory;

    public RibbonLoadBalancerClient(SpringClientFactory clientFactory) {
        this.clientFactory = clientFactory;
    }

    @Override
    public  T execute(String serviceId, LoadBalancerRequest request) throws IOException {
        ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
        Server server = getServer(loadBalancer);
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }
        RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
                serviceId), serverIntrospector(serviceId).getMetadata(server));

        return execute(serviceId, ribbonServer, request);
    }

    @Override
    public  T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest request) throws IOException {
        Server server = null;
        if(serviceInstance instanceof RibbonServer) {
            server = ((RibbonServer)serviceInstance).getServer();
        }
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }

        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId);
        RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

        try {
            T returnVal = request.apply(serviceInstance);
            statsRecorder.recordStats(returnVal);
            return returnVal;
        }
        // catch IOException and rethrow so RestTemplate behaves correctly
        catch (IOException ex) {
            statsRecorder.recordStats(ex);
            throw ex;
        }
        catch (Exception ex) {
            statsRecorder.recordStats(ex);
            ReflectionUtils.rethrowRuntimeException(ex);
        }
        return null;
    }

}

 分析execute(String serviceId, LoadBalancerRequest request)方法的执行步骤:

 1.获取负载均衡器,默认的负载均衡器是ZoneAwareLoadBalancer,这个后面再讲

 2.获取具体的服务实例,这里并没有调用LoadBalancerClient的ServiceInstance choose(String serviceId)方法,而是调用的ILoadBalancer的Server chooseServer(Object key)方法来获取具体的服务实例,想知道ILoadBalancer干啥用,请看下面的接口介绍

 3.将获取到的具体服务实例包装成RibbonServer对象(该对象存储了服务实例信息、服务名serviceId、是否需要https等其他信息),最后调用LoadBalancerRequest的apply方法,向一个实际的的具体服务实例发起请求,请看下面LoadLoadBalancerRequest的分析

ILoadBalancer
package com.netflix.loadbalancer;

import java.util.List;

public interface ILoadBalancer {

    public void addServers(List newServers);
    
    public Server chooseServer(Object key);
    
    public void markServerDown(Server server);
    
    public List getReachableServers();

    public List getAllServers();
}

 该类也是负载均衡器的一个抽象接口,主要定义了一系列的抽象操作:

void addServers(List newServers) : 向负载均衡器维护的服务实例列表中增加服务实例

Server chooseServer(Object key) : 根据某种负载策略,从负载均衡器中挑选一个具体的服务实例

void markServerDown(Server server) : 通知和标识负载均衡器中的某个具体的服务实例已经停止服务,防止负载均衡器在下一次获取服务实例清单前认为该服务实例是正常服务

List getReachableServers() : 获取当前正常的服务实例列表

List getAllServers() : 获取所有已知的服务实例列表

 该接口中使用到的Server对象定义是一个传统的服务端节点,在该类中存储了服务端节点的一些元数据信息,包括host、port以及一些部署信息等。

 整理该接口的实现类主要有:

AbstractLoadBalancer(抽象类)

BaseLoadBalancer:继承自AbstractLoadBalancer

DynamicServerListLoadBalancer:继承自BaseLoadBalancer

NoOpLoadBalancer:继承自AbstractLoadBalancer

ZoneAwareLoadBalancer:继承自DynamicServerListLoadBalancer

 默认使用的负载均衡器是ZoneAwareLoadBalancer,要想知道这个结果很简单,查看一下RibbonClientConfiguration配置类,该类中有一个方法如下:

    @Bean
    @ConditionalOnMissingBean
    public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
            ServerList serverList, ServerListFilter serverListFilter,
            IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
        if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
            return this.propertiesFactory.get(ILoadBalancer.class, config, name);
        }
        return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                serverListFilter, serverListUpdater);
    }

 此处的代码逻辑如果没有配置负载均衡器,那么默认的负载均衡器就是ZoneAwareLoadBalancer。

LoadLoadBalancerRequest

 该类是一个抽象的接口,只有一个apply(ServiceInstance instance)方法,先看一下ServiceInstance类。

ServiceInstance

 该接口暴露了服务实例的一些基本信息,如:serviceId、port、host等,源码如下:

package org.springframework.cloud.client;

import java.net.URI;
import java.util.Map;

public interface ServiceInstance {

    String getServiceId();

    String getHost();

    int getPort();

    boolean isSecure();

    URI getUri();

    Map getMetadata();
    
    default String getScheme() {
        return null;
    }
}

 上面提及的RibbonServer是ServiceInstance的一个具体实现,查看源码可以知道RibbonServer除了包含Server对象之外,还存储了服务名、是否使用HTTPS以及一个Map类型的元数据集合。

 看完上面两个类,我们来看一下具体的apply方法的实现,源码中使用Lamada表达式实现,下面我只抽出具体的功能实现的源码如下:

            HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, loadBalancer);
            if (transformers != null) {
                for (LoadBalancerRequestTransformer transformer : transformers) {
                    serviceRequest = transformer.transformRequest(serviceRequest, instance);
                }
            }
            return execution.execute(serviceRequest, body);

 在apply方法中,首先将HttpRequest包装成ServiceRequestWrapper对象,下面看一下ServiceRequestWrapper类,源码如下:

package org.springframework.cloud.client.loadbalancer;

import java.net.URI;

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.support.HttpRequestWrapper;

public class ServiceRequestWrapper extends HttpRequestWrapper {
    private final ServiceInstance instance;
    private final LoadBalancerClient loadBalancer;

    public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance,
                                 LoadBalancerClient loadBalancer) {
        super(request);
        this.instance = instance;
        this.loadBalancer = loadBalancer;
    }

    @Override
    public URI getURI() {
        URI uri = this.loadBalancer.reconstructURI(
                this.instance, getRequest().getURI());
        return uri;
    }
}

 在ServiceRequestWrapper类中重写了getURI方法,重写后的该方法通过调用LoadBalancerClient中的reconstructURI方法来构建一个host:port形式的URI对外发起请求。

 在apply方法的最后在调用ClientHttpRequestExecution的execute方法时,实际会去执行InterceptingClientHttpRequest类下面的InterceptingRequestExecution的execute方法,该方法的具体代码如下:

        public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
            if (this.iterator.hasNext()) {
                ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
                return nextInterceptor.intercept(request, body, this);
            }
            else {
                HttpMethod method = request.getMethod();
                Assert.state(method != null, "No standard HTTP method");
                ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
                request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
                if (body.length > 0) {
                    if (delegate instanceof StreamingHttpOutputMessage) {
                        StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                        streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
                    }
                    else {
                        StreamUtils.copy(body, delegate.getBody());
                    }
                }
                return delegate.execute();
            }
        }
    }

 分析上面的代码可以看出,在调用requestFactory.createRequest(request.getURI(), method)创建请求时会调用getURI方法,此时调用的getURI方法就是ServiceRequestWrapper类中的getURI方法,进而调用LoadBalancerClient中的reconstructURI方法,下面我们看一下RibbonLoadBalancerClient中的reconstructURI是如何实现的

    public URI reconstructURI(ServiceInstance instance, URI original) {
        Assert.notNull(instance, "instance can not be null");
        String serviceId = instance.getServiceId();
        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId);

        URI uri;
        Server server;
        if (instance instanceof RibbonServer) {
            RibbonServer ribbonServer = (RibbonServer) instance;
            server = ribbonServer.getServer();
            uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
        } else {
            server = new Server(instance.getScheme(), instance.getHost(), instance.getPort());
            IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
            ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
            uri = updateToSecureConnectionIfNeeded(original, clientConfig,
                    serverIntrospector, server);
        }
        return context.reconstructURIWithServer(server, uri);
    }

 分析上面的代码,首先根据serviceId从SpringClientFactory对象中获取serviceId对应的负载均衡器的上下文RibbonLoadBalancerContext对象,然后再使用该上下文对象的reconstructURIWithServer方法和server对象来构建具体的URI。

 备注:

SpringClientFactory : 一个用来创建客户端负载均衡器的工厂类,该工厂类会为每一个不同名的Ribbon客户端生成不同的上下文

RibbonLoadBalancerContext : LoadBalancerContext的子类,该类用于存储一些被负载均衡器使用的上下文内容和API操作。

 关于LoadBalancerContext类中的reconstructURIWithServer方法是如何组装host:port形式的逻辑很容易理解,我们就不在这里解释了,有兴趣的可以自己去看一下。

小结

 使用被@LoadBalanced注解的RestTemplate发起请求时,会被LoadBalancerInterceptor拦截,然后借助负载均衡器LoadBalancerClient将逻辑服务名转换为host:port的具体的服务实例地址,在使用RibbonLoadBalancerClient(Ribbon实现的负载均衡器)时实际使用的是Ribbon中定义的ILoadBalancer,默认自动化配置的负载均衡器是ZoneAwareLoadBalancer。

代码地址

https://gitee.com/petterheng/spring-cloud-eureka

后续

后面会介绍负载均衡器的源码分析,请继续关注!!!

 前往微信公众号阅读文章,点击这里,或者直接扫码关注公众号

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/76726.html

相关文章

  • Spring Cloud实战(三)-Spring Cloud Netflix Ribbon

    摘要:概要什么是实战整合实现负载均衡是什么是一个客户端负载均衡的组件什么是负载均衡负载均衡就是分发请求流量到不同的服务器目前的实现有软件和硬件负载均衡分为两种服务器端负载均衡如上图所示服务器端负载均衡是对客户透明的用户请求到服务器真正的服务器是由 概要 什么是Spring Cloud Netflix Ribbon? 实战:整合Ribbon实现负载均衡 Spring Cloud Netfl...

    wangbinke 评论0 收藏0
  • 一起学习使用Spring Cloud Netflix之Ribbon

    摘要:本例中介绍如何使用来完成服务调用并实现负载均衡。即,对于注册中心而言,生产者和调用者都是端。文件配置如下在文件中,我们将应用命名为,端口为,表示注册中心地址。 前言 Ribbon是Spring Cloud体系中完成负载均衡的重要组件。Spring Cloud体系中有两种完成服务调用的组件,一种是Ribbon+RestTemplate,另一种Feign。Feign默认使用的也是Ribbo...

    nidaye 评论0 收藏0
  • Spring Cloud 参考文档(客户端负载均衡器:Ribbon

    摘要:客户端负载均衡器是一个客户端负载均衡器,可以让你对和客户端的行为进行大量控制,已经使用了,因此,如果你使用,此部分也适用。 客户端负载均衡器:Ribbon Ribbon是一个客户端负载均衡器,可以让你对HTTP和TCP客户端的行为进行大量控制,Feign已经使用了Ribbon,因此,如果你使用@FeignClient,此部分也适用。 Ribbon中的一个核心概念是命名客户端,每个负载均...

    Songlcy 评论0 收藏0
  • 史上最简单的SpringCloud教程 | 第二篇: 服务消费者(rest+ribbon

    摘要:在服务架构中,业务都会被拆分成一个独立的服务,服务与服务的通讯是基于的。配置文件如下在工程的启动类中通过向服务中心注册并且注册了一个通过注册表明,这个是负载均衡的。 转载请标明出处: http://blog.csdn.net/forezp/a...本文出自方志朋的博客 在上一篇文章,讲了服务的注册和发现。在服务架构中,业务都会被拆分成一个独立的服务,服务与服务的通讯是基于http re...

    dreamans 评论0 收藏0
  • 7、服务发现&服务消费者Ribbon

    摘要:在服务注册服务提供者这一篇可能学习了这么开发一个服务提供者,在生成上服务提供者通常是部署在内网上,即是服务提供者所在的服务器是与互联网完全隔离的。服务消费者本质上也是一个。 在《服务注册&服务提供者》这一篇可能学习了这么开发一个服务提供者,在生成上服务提供者通常是部署在内网上,即是服务提供者所在的服务器是与互联网完全隔离的。这篇说下服务发现(服务消费者),通常服务消费者是部署在与互联网...

    tangr206 评论0 收藏0
  • Spring Cloud 体验

    摘要:多层服务调用常见于微服务架构中较底层的服务如果出现故障,会导致连锁故障。 Spring Cloud 体验 简介 Spring Cloud为开发人员提供了快速构建分布式系统的一些工具,包括配置管理、服务发现、断路器、路由、微代理、 事件总线、全局锁、决策竞选、分布式会话等等 基于Spring Boot,Spring Cloud将各公司成熟服务框架组合起来,通过Spring Boo...

    NotFound 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<