SpringCloud升级之路2020.0.x版-41.

本系列代码地址:​​https://github.com/JoJoTec/spring-cloud-parent​

我们继续分析上一节提到的 ​​WebHandler​​。加入 Spring Cloud Sleuth 以及 Prometheus 相关依赖之后, Spring Cloud Gateway 的处理流程如下所示:

SpringCloud升级之路2020.0.x版-41.

Spring Cloud Gateway 入口 -> WebFlux 的 DefaultWebFilterChain

Spring Cloud Gateway 是基于 Spring WebFlux 开发的异步响应式网关,异步响应式代码比较难以理解和阅读,我这里给大家分享一种方法去理解,通过这个流程来理解 Spring Cloud Gateway 的工作流程以及底层原理。其实可以理解为,上图这个流程,就是拼出来一个完整的 Mono(或者 Flux)流,最后 subscribe 执行。

当收到一个请求的时候,会经过 ​​org.springframework.web.server.handler.DefaultWebFilterChain​​,这是 WebFilter 的调用链,这个链路包括三个 WebFilter:

  • ​org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter​​:添加 Prometheus 相关依赖之后,会有这个 MetricsWebFilter,用于记录请求处理耗时,采集相关指标。
  • ​org.springframework.cloud.sleuth.instrument.web.TraceWebFilter​​:添加 Spring Cloud Sleuth 相关依赖之后,会有这个 TraceWebFilter。
  • ​org.springframework.cloud.gateway.handler.predicate.WeightCalculatorWebFilter​​:Spring Cloud Gateway 路由权重相关配置功能相关实现类,这个我们这里不关心。

在这个 ​​DefaultWebFilterChain​​ 会形成这样一个 Mono,我们依次将他们标记出来,首先是入口代码 ​​org.springframework.web.server.handler.DefaultWebFilterChain#filter​​:

public Mono filter(ServerWebExchange exchange) {
return Mono.defer(() ->
// this.currentFilter != null 代表 WebFilter 链还没有结束
// this.chain != null 代表 WebFilter 链不为空
this.currentFilter != null && this.chain != null ?
//在 WebFilter 链没有结束的情况下,调用 WebFilter
invokeFilter(this.currentFilter, this.chain, exchange) :
//在 WebFilter 结束的情况下,调用 handler
this.handler.handle(exchange));
}

对于我们这里的 WebFilter 链的第一个 ​​MetricsWebFilter​​,假设启用了对应的采集统计的话,这时候生成的 Mono 就是:

return Mono.defer(() ->
chain.filter(exchange).transformDeferred((call) -> {
long start = System.nanoTime();
return call
//成功时,记录响应时间
.doOnSuccess((done) -> MetricsWebFilter.this.onSuccess(exchange, start))
//失败时,记录响应时间和异常
.doOnError((cause) -> MetricsWebFilter.this.onError(exchange, start, cause));
});
);

这里为了方便,我们对代码做了简化,由于我们要将整个链路的所有 Mono 和 Flux 拼接在一起行程完整链路,所以原本是 ​​MetricsWebFilter​​中的 ​​onSuccess(exchange, start)​​方法,被改成了 ​​MetricsWebFilter.this.onSuccess(exchange, start)​​ 这种伪代码。

接着,根据​​DefaultWebFilterChain​​ 的源码分析,​​chain.filter(exchange)​​ 会继续 WebFilter 链路,到达下一个 WebFilter,即 ​​TraceWebFilter​​。经过 ​​TraceWebFilter​​,Mono 就会变成:

return Mono.defer(() ->
new MonoWebFilterTrace(source, chain.filter(exchange), TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略
});
);

可以看出,在 ​​TraceWebFilter​​ 中,整个内部 Mono (​​chain.filter(exchange)​​ 后续的结果)都被封装成了一个 ​​MonoWebFilterTrace​​,这也是保持链路追踪信息的关键实现。

继续 WebFilter 链路,经过最后一个 WebFilter ​​WeightCalculatorWebFilter​​; 这个 WebFilter 我们不关心,里面对路由权重做了一些计算操作,我们这里直接忽略即可。这样我们就走完了所有 WebFilter 链路,来到了最后的调用 ​​DefaultWebFilterChain.this.handler​​,这个 handler 就是 ​​org.springframework.web.reactive.DispatcherHandler​​。在 DispatcherHandler 中,我们会计算出路由并发送请求到符合条件的 GatewayFilter。经过 DispatcherHandler,Mono 会变成:

return Mono.defer(() ->
new MonoWebFilterTrace(source,
Flux.fromIterable(DispatcherHandler.this.handlerMappings) //读取所有的 handlerMappings
.concatMap(mapping -> mapping.getHandler(exchange)) //按顺序调用所有的 handlerMappings 的 getHandler 方法,如果有对应的 Handler 会返回,否则返回 Mono.empty();
.next() //找到第一个返回不是 Mono.empty() 的 Handler
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404
.flatMap(handler -> DispatcherHandler.this.invokeHandler(exchange, handler)) //调用对应的 Handler
.flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //处理结果
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略
});
);

handlerMappings 包括:

  • ​org.springframework.boot.actuate.endpoint.web.reactive.WebFluxEndPointHandlerMapping​​:由于我们项目中添加了 Actuator 相关依赖,所以这里有这个 HandlerMapping。Actuator 相关路径映射,不是我们这里关心的。但是可以看出,Actuator 相关路径优先于 Spring Cloud Gateway 配置路由
  • ​org.springframework.boot.actuate.endpoint.web.reactive.ControllerEndpointHandlerMapping​​:由于我们项目中添加了 Actuator 相关依赖,所以这里有这个 HandlerMapping。使用 @ControllerEndpoint 或者 @RestControllerEndpoint 注解标注的 Actuator 相关路径映射,不是我们这里关心的。
  • ​org.springframework.web.reactive.function.server.support.RouterFunctionMapping​​:在 Spring-WebFlux 中,你可以定义很多不同的 RouterFunction 来控制路径路由,但这也不是我们这里关心的。但是可以看出,自定义的 RouterFunction 会优先于 Spring Cloud Gateway 配置路由
  • ​org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerMapping​​:针对 @RequestMapping 注解的路径的 HandlerMapping,不是我们这里关心的。但是可以看出,如果你在 Spring Cloud Gateway 中指定 RequestMapping 路径,会优先于 Spring Cloud Gateway 配置路由
  • ​org.springframework.cloud.gateway.handler.RoutePredicateHandlerMapping​​:这个是 Spring Cloud Gateway 的 HandlerMapping,会读取 Spring Cloud Gateway 配置并生成路由。这个是我们这里要详细分析的。

其实这些 handlerMappings,我们这里肯定走的是 ​​RoutePredicateHandlerMapping​​ 的相关逻辑,所以我们的 Mono 又可以简化成:

return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.getHandler(exchange)
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404
.flatMap(handler -> DispatcherHandler.this.invokeHandler(exchange, handler)) //调用对应的 Handler
.flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //处理结果
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略
});
);

我们来看 ​​RoutePredicateHandlerMapping​​,首先这些 handlerMapping 都是继承了抽象类 ​​org.springframework.web.reactive.handler.AbstractHandlerMapping​​, 前面我们拼接的 Mono 里面的 getHandler 的实现其实就在这个抽象类中:

public Mono getHandler(ServerWebExchange exchange) {
//调用抽象方法 getHandlerInternal 获取真正的 Handler
return getHandlerInternal(exchange).map(handler -> {
//这里针对 handler 做一些日志记录
if (logger.isDebugEnabled()) {
logger.debug(exchange.getLogPrefix() + "Mapped to " + handler);
}
// 跨域处理
ServerHttpRequest request = exchange.getRequest();
if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) {
CorsConfiguration config = (this.corsConfigurationSource != null ?
this.corsConfigurationSource.getCorsConfiguration(exchange) : null);
CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange);
config = (config != null ? config.combine(handlerConfig) : handlerConfig);
if (config != null) {
config.validateAllowCredentials();
}
if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {
return NO_OP_HANDLER;
}
}
return handler;
});
}

可以看出,其实核心就是每个实现类的 ​​getHandlerInternal(exchange)​​方法,所以在我们拼接的 Mono 中,我们会忽略抽象类中的针对 handler 之后的 map 处理。

return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.getHandlerInternal(exchange)
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404
.flatMap(handler -> DispatcherHandler.this.invokeHandler(exchange, handler)) //调用对应的 Handler
.flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //处理结果
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略
});
);

接下来经过 ​​RoutePredicateHandlerMapping​​ 的 ​​getHandlerInternal(exchange)​​ 方法,我们的 Mono 变成了:

return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由
.flatMap((Function>) r -> {
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到
return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler
}).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由
Mono.empty() //返回 Mono.empty()
.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})))
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404
.flatMap(handler -> DispatcherHandler.this.invokeHandler(exchange, handler)) //调用对应的 Handler
.flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //处理结果
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略
});
);

​RoutePredicateHandlerMapping.this.lookupRoute(exchange)​​ 根据请求寻找路由,这个我们就不详细展开了,其实就是根据你的 Spring Cloud Gateway 配置,找到合适的路由。接下来我们来看调用对应的 Handler,即 FilteringWebHandler。​​DispatcherHandler.this.invokeHandler(exchange, handler)​​ 我们这里也不详细展开,我们知道其实就是调用 Handler 的 handle 方法,即 FilteringWebHandler 的 handle 方法,所以 我们的 Mono 变成了:

return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由
.flatMap((Function>) r -> {
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到
return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler
}).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由
Mono.empty()
.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})))
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404
.then(FilteringWebHandler.this.handle(exchange).then(Mono.empty())) //调用对应的 Handler
.flatMap(result -> DispatcherHandler.this.handleResult(exchange, result)), //处理结果
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略
});
);

由于调用对应的 Handler,最后返回的是 ​​Mono.empty()​​,所以后面的 flatMap 其实不会执行了。所以我们可以将最后的处理结果这一步去掉。所以我们的 Mono 就变成了:

return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由
.flatMap((Function>) r -> {
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到
return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler
}).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由
Mono.empty()
.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})))
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404
.then(FilteringWebHandler.this.handle(exchange).then(Mono.empty()))), //调用对应的 Handler
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略
});
);

​FilteringWebHandler.this.handle(exchange)​​ 其实就是从 Attributes 中取出路由,从路由中取出对应的 GatewayFilters,与全局 GatewayFilters 放到同一个 List 中,并按照这些 GatewayFilter 的顺序排序(可以通过实现 ​​org.springframework.core.Ordered​​ 接口来制定顺序),然后生成 ​​DefaultGatewayFilterChain​​ 即 GatewayFilter 链路。对应的源码是:

public Mono handle(ServerWebExchange exchange) {
//从 Attributes 中取出路由,从路由中取出对应的 GatewayFilters
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
List gatewayFilters = route.getFilters();
//与全局 GatewayFilters 放到同一个 List 中
List combined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters);
//按照这些 GatewayFilter 的顺序排序(可以通过实现 `org.springframework.core.Ordered` 接口来制定顺序)
AnnotationAwareOrderComparator.sort(combined);

if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}
//生成调用链
return new DefaultGatewayFilterChain(combined).filter(exchange);
}

这个 GatewayFilter 调用链和 WebFilter 调用链类似,参考 ​​DefaultGatewayFilterChain​​ 的源码:

public Mono filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
//如果链路没有结束,则继续链路
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index);
//这里将 index + 1,也就是调用链路中的下一个 GatewayFilter
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
//每个 filter 中如果想要继续链路,则会调用 chain.filter(exchange),这也是我们开发 GatewayFilter 的时候的使用方式
return filter.filter(exchange, chain);
}
else {
//到达末尾,链路结束
return Mono.empty(); // complete
}
});
}

所以,经过 ​​DefaultGatewayFilterChain​​ 后,我们的 Mono 就会变成:

return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由
.flatMap((Function>) r -> {
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到
return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler
}).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由
Mono.empty()
.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})))
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404
.then(new DefaultGatewayFilterChain(combined).filter(exchange).then(Mono.empty()))), //调用对应的 Handler
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略
});
);

再继续展开 ​​DefaultGatewayFilterChain​​ 的链路调用,可以得到:

return Mono.defer(() ->
new MonoWebFilterTrace(source,
RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由
.flatMap((Function>) r -> {
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到
return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler
}).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由
Mono.empty()
.then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})))
.switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404
.then(
Mono.defer(() -> {
//如果链路没有结束,则继续链路
if (DefaultGatewayFilterChain.this.index < DefaultGatewayFilterChain.this.filters.size()) {
GatewayFilter filter = DefaultGatewayFilterChain.this.filters.get(DefaultGatewayFilterChain.this.index);
//这里将 index + 1,也就是调用链路中的下一个 GatewayFilter
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(DefaultGatewayFilterChain.this, DefaultGatewayFilterChain.this.index + 1);
//每个 filter 中如果想要继续链路,则会调用 chain.filter(exchange),这也是我们开发 GatewayFilter 的时候的使用方式
return filter.filter(exchange, chain);
}
else {
return Mono.empty(); //链路完成
}
})
.then(Mono.empty()))
), //调用对应的 Handler
TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
//MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略
});
);

这样,就形成了 Spring Cloud Gateway 针对路由请求的完整 Mono 调用链。

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer

SpringCloud升级之路2020.0.x版-41.

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/125259.html

相关文章

  • SpringCloud升级之路2020.0.x-41. SpringCloudGateway 基本

    摘要:在这里,会将上下文中载入的拼接成,然后调用其方法的,它是的处理请求业务的起点。添加相关依赖之后,会有这个。路由权重相关配置功能相关实现类,这个我们这里不关心。本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent我们继续分析上一节提到的 WebHandler,经过将请求封装成 ServerWebExchange 的 HttpWebHand...

    不知名网友 评论0 收藏0
  • SpringCloud升级之路2020.0.x-41. SpringCloudGateway 基本

    摘要:在这里,会将上下文中载入的拼接成,然后调用其方法的,它是的处理请求业务的起点。添加相关依赖之后,会有这个。路由权重相关配置功能相关实现类,这个我们这里不关心。 本系列代码地址:​​https://github.com/JoJoTec/spring-cloud-parent​​我们继续分析上一节提到的 ​​WebHandle...

    番茄西红柿 评论0 收藏2637
  • SpringCloud升级之路2020.0.x-41. SpringCloudGateway 基本

    摘要:将请求封装成将请求封装成的接口定义是但是最外层传进来的参数是和,需要将他们封装成,这个工作就是在中做的。其实主要任务就是将各种参数封装成除了和本次请求相关的和,还有会话管理器,编码解码器配置,国际化配置还有用于扩展。本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent接下来,将进入我们升级之路的又一大模块,即网关模块。网关模块我们废弃了...

    不知名网友 评论0 收藏0
  • SpringCloud升级之路2020.0.x-41. SpringCloudGateway 基本

    摘要:添加相关依赖之后,会有这个。接着,根据的源码分析,会继续链路,到达下一个,即。在中,我们会计算出路由并发送请求到符合条件的。这个是的,会读取配置并生成路由。本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent我们继续分析上一节提到的 WebHandler。加入 Spring Cloud Sleuth 以及 Prometheus 相关依赖...

    不知名网友 评论0 收藏0
  • SpringCloud升级之路2020.0.x-37. 实现异步的客户端封装配置管理的意义与设计

    摘要:对于异步的请求,使用的是异步客户端即。要实现的配置设计以及使用举例要实现的配置设计以及使用举例首先,我们要实现的,其包含三个重试重试的要在负载均衡之前,因为重试的时候,我们会从负载均衡器获取另一个实例进行重试,而不是在同一个实例上重试多次。 本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent 为何需要封装异步 HT...

    fxp 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<