如果想直接查看修改部分请跳转 动手-点击跳转
本文基于 ReactiveLoadBalancerClientFilter使用RoundRobinLoadBalancer
灰度发布
灰度发布,又称为金丝雀发布,是一种新旧版本平滑发布的方式。在上面可以对同一个API进行两个版本 的内容,由一部分用户先行体验,如无问题,逐步扩大发布范围
本文将讲述如何基于基于nacos的matedata与Ribbon如何去做灰度发布
重点知识
Spring Cloud Gateway两种负载均衡器
官网说明两种负载均衡器
Gateway有两种客户端负载均衡器,LoadBalancerClientFilter和ReactiveLoadBalancerClientFilter。
LoadBalancerClientFilter使用一个Ribbon的阻塞式LoadBalancerClient,Gateway建议使用ReactiveLoadBalancerClientFilter。
可以通过设置spring.cloud.loadbalancer.ribbon.enabled=false,切换到ReactiveLoadBalancerClientFilter。无论使用Ribbon还是LoadBalancer,在Route中配置的lb是一样的
本节采用 ReactiveLoadBalancerClientFilter 进行设置
采用ReactiveLoadBalancerClientFilter使用RoundRobinLoadBalancer
灰度发布服务器选择 简单示意图
Client —-> gateway —-> GlobalFilter 拦截 选择一个灰度发布服务器 如果没有灰度服务则选取正常服务器 —->转发到服务
nacos的matedata
我们在向 Nacos Server 进行服务注册的时候往往会附加一些 metadata ,可以参考官方文档中 Dubbo 融合 Nacos 成为注册中心 章节。
充分利用好服务实例的 metadata ,可以衍生出许多有意思的实践。
完全可以把相关内容放进 metadata 中,好比说版本号,特性名等等
然后再根据负载均衡路由到不同的服务
1 2
  
 | 
spring.cloud.nacos.discovery.metadata.version=1.15 spring.cloud.nacos.discovery.metadata.advance=true
  
 | 
准备工作
nacos 部署
gateway 部署 -可以参考
部署两台服务A
开始
跟踪代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
  
 | 
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {     URI url = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);     String schemePrefix = (String)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);     if (url != null && ("lb".equals(url.getScheme()) || "lb".equals(schemePrefix))) {         ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);         if (log.isTraceEnabled()) {             log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);         }
          return this.choose(exchange).doOnNext((response) -> {             if (!response.hasServer()) {                 throw NotFoundException.create(this.properties.isUse404(), "Unable to find instance for " + url.getHost());             } else {                 ServiceInstance retrievedInstance = (ServiceInstance)response.getServer();                 URI uri = exchange.getRequest().getURI();                 String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";                 if (schemePrefix != null) {                     overrideScheme = url.getScheme();                 }
                  DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance, overrideScheme);                 URI requestUrl = this.reconstructURI(serviceInstance, uri);                 if (log.isTraceEnabled()) {                     log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);                 }
                  exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);             }         }).then(chain.filter(exchange));     } else {         return chain.filter(exchange);     } }
  protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {     return LoadBalancerUriTools.reconstructURI(serviceInstance, original); }
  private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) {     URI uri = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);     ReactorLoadBalancer<ServiceInstance> loadBalancer = (ReactorLoadBalancer)this.clientFactory.getInstance(uri.getHost(), ReactorServiceInstanceLoadBalancer.class);     if (loadBalancer == null) {         throw new NotFoundException("No loadbalancer available for " + uri.getHost());     } else {         return loadBalancer.choose(this.createRequest());     } }
 
 | 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
  
 | 
@SuppressWarnings("rawtypes") @Override // see original // https://github.com/Netflix/ocelli/blob/master/ocelli-core/ // src/main/java/netflix/ocelli/loadbalancer/RoundRobinLoadBalancer.java public Mono<Response<ServiceInstance>> choose(Request request) { 	// TODO: move supplier to Request? 	// Temporary conditional logic till deprecated members are removed. 	if (serviceInstanceListSupplierProvider != null) { 		ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider 				.getIfAvailable(NoopServiceInstanceListSupplier::new); 		return supplier.get().next().map(this::getInstanceResponse); 	} 	ServiceInstanceSupplier supplier = this.serviceInstanceSupplier 			.getIfAvailable(NoopServiceInstanceSupplier::new); 	return supplier.get().collectList().map(this::getInstanceResponse); }
  private Response<ServiceInstance> getInstanceResponse( 		List<ServiceInstance> instances) { 	if (instances.isEmpty()) { 		log.warn("No servers available for service: " + this.serviceId); 		return new EmptyResponse(); 	} 	// TODO: enforce order? 	int pos = Math.abs(this.position.incrementAndGet());
  	ServiceInstance instance = instances.get(pos % instances.size());
  	return new DefaultResponse(instance); }
 
 | 
通过代码跟踪 ReactiveLoadBalancerClientFilter 与 RoundRobinLoadBalancer 可以发现,最终 我们只需要对 getInstanceResponse 进行改造 即可满足所有需要
动手!
开始修改代码
我们只需要新增一个 GlobalFilter 在 AdvanceReactiveLoadBalancerClientFilter 执行之前 ,并且对LoadBalancer 的getInstanceResponse 做一下稍微改造就OK了
复制 RoundRobinLoadBalancer 内容 并修改 getInstanceResponse() 逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
  
 | 
package top.lingma.gateway.loadbalancer;
  import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.ObjectProvider; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.loadbalancer.reactive.DefaultResponse; import org.springframework.cloud.client.loadbalancer.reactive.EmptyResponse; import org.springframework.cloud.client.loadbalancer.reactive.Request; import org.springframework.cloud.client.loadbalancer.reactive.Response; import org.springframework.cloud.loadbalancer.core.*; import reactor.core.publisher.Mono;
  import java.util.List; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors;
  public class AdvanceRoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {
      private static final Log log = LogFactory.getLog(AdvanceRoundRobinLoadBalancer.class);
      private final AtomicInteger position;     private final AtomicInteger positionAdvance;
      @Deprecated     private ObjectProvider<ServiceInstanceSupplier> serviceInstanceSupplier;
      private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
      private final String serviceId;
 
      @Deprecated     public AdvanceRoundRobinLoadBalancer(String serviceId, ObjectProvider<ServiceInstanceSupplier> serviceInstanceSupplier) {         this(serviceId, serviceInstanceSupplier, new Random().nextInt(1000));     }
      public AdvanceRoundRobinLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {         this(serviceInstanceListSupplierProvider, serviceId, new Random().nextInt(1000));     }
 
      public AdvanceRoundRobinLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId, int seedPosition) {         this.serviceId = serviceId;         this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;         this.position = new AtomicInteger(seedPosition);         this.positionAdvance = new AtomicInteger(seedPosition);     }
      @Deprecated     public AdvanceRoundRobinLoadBalancer(String serviceId, ObjectProvider<ServiceInstanceSupplier> serviceInstanceSupplier, int seedPosition) {         this.serviceId = serviceId;         this.serviceInstanceSupplier = serviceInstanceSupplier;         this.position = new AtomicInteger(seedPosition);         this.positionAdvance = new AtomicInteger(seedPosition);     }
 
      @Override
      public Mono<Response<ServiceInstance>> choose(Request request) {
 
          if (serviceInstanceListSupplierProvider != null) {             ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);             return supplier.get().next().map((instances) -> {                 // 此处做了选择逻辑的修改                 if (request instanceof AdvanceRequestContext) {                     List<ServiceInstance> advanceInstance = instances.stream().filter(s -> s.getMetadata().getOrDefault("advance", "").equals("true")).collect(Collectors.toList());                     return getInstanceResponse(advanceInstance, request);                 } else {                     List<ServiceInstance> routineInstance = instances.stream().filter(s -> !s.getMetadata().getOrDefault("advance", "").equals("true")).collect(Collectors.toList());                     return getInstanceResponse(routineInstance, request);                 }
              });         }         ServiceInstanceSupplier supplier = this.serviceInstanceSupplier.getIfAvailable(NoopServiceInstanceSupplier::new);         return supplier.get().collectList().map((instances) -> {             if (request instanceof AdvanceRequestContext) {                 // 此处做了选择逻辑的修改                 List<ServiceInstance> advanceInstance = instances.stream().filter(s -> s.getMetadata().getOrDefault("advance", "").equals("true")).collect(Collectors.toList());                 return getInstanceResponse(advanceInstance, request);             } else {                 List<ServiceInstance> instance = instances.stream().filter(s -> !s.getMetadata().getOrDefault("advance", "").equals("true")).collect(Collectors.toList());                 return getInstanceResponse(instance, request);             }
          });     }
      private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances, Request request) {         if (instances.isEmpty()) {             if (request instanceof AdvanceRequestContext) {                 return new AdvanceEmptyResponse();             }             log.warn("No servers available for service: " + this.serviceId);             return new EmptyResponse();         }         int pos = 1;         //灰度发布选择逻辑         if (request instanceof AdvanceRequestContext) {             pos = Math.abs(this.positionAdvance.incrementAndGet());         } else {             pos = Math.abs(this.position.incrementAndGet());         }         ServiceInstance instance = instances.get(pos % instances.size());         return new DefaultResponse(instance);
      }
  }
 
 
  
 | 
AdvanceEmptyResponse 类是为了标识无灰度发布服务器,此时可以走正常服务器
1 2 3 4 5 6 7 8 9 10 11 12 13
  
 | 
package top.lingma.gateway.loadbalancer;
  import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.loadbalancer.reactive.CompletionContext; import org.springframework.cloud.client.loadbalancer.reactive.Response;
  public class AdvanceEmptyResponse extends org.springframework.cloud.client.loadbalancer.EmptyResponse implements Response<ServiceInstance> {     public AdvanceEmptyResponse() {     }
      public void onComplete(CompletionContext completionContext) {     } }
  
 | 
AdvanceRequestContext 是为了能从 GlobalFilter 传递信息到 LoadBalancer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
  
 | 
package top.lingma.gateway.loadbalancer;
  import org.springframework.cloud.client.loadbalancer.reactive.Request; import org.springframework.web.server.ServerWebExchange;
  public class AdvanceRequestContext<T> implements Request {
      private T exchange;
      public AdvanceRequestContext(T exchange) {         this.exchange = exchange;     }
      @Override     public T getContext() {         return exchange;     } }
 
  
 | 
AdvanceReactiveLoadBalancerClientFilter 复制于 ReactiveLoadBalancerClientFilter
注意两点
第一灰度服务器选择在ReactiveLoadBalancerClientFilter 之前 LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150 – 1;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
  
 | 
package top.lingma.gateway.loadbalancer;
  import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.loadbalancer.LoadBalancerUriTools; import org.springframework.cloud.client.loadbalancer.reactive.Response; import org.springframework.cloud.gateway.config.LoadBalancerProperties; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter; import org.springframework.cloud.gateway.support.DelegatingServiceInstance; import org.springframework.cloud.gateway.support.NotFoundException; import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer; import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer; import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory; import org.springframework.core.Ordered; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono;
  import java.net.URI; import java.util.List;
  import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.*;
  @Component public class AdvanceReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {
      private static final Log log = LogFactory.getLog(ReactiveLoadBalancerClientFilter.class);
      private static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150 - 1;
      private final LoadBalancerClientFactory clientFactory;
      private LoadBalancerProperties properties;
      public AdvanceReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory, LoadBalancerProperties properties) {         this.clientFactory = clientFactory;         this.properties = properties;     }
      @Override     public int getOrder() {         return LOAD_BALANCER_CLIENT_FILTER_ORDER;     }
      @Override     @SuppressWarnings("Duplicates")     public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {         // 灰度用户专属服务器         判定是否是灰度用户,是否拥有灰度权限 不然直接进行下一步         List<String> secChUa = exchange.getRequest().getHeaders().get("sec-ch-ua");         if (secChUa == null || secChUa.isEmpty() || !secChUa.stream().findFirst().map(r -> r.contains("Edge")).orElse(false)) {             return chain.filter(exchange);         }
          URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);         String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);         if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {             return chain.filter(exchange);         }         // preserve the original url         addOriginalRequestUrl(exchange, url);
          if (log.isTraceEnabled()) {             log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);         }
          return choose(exchange).doOnNext(response -> {             if (response instanceof AdvanceEmptyResponse) {                 return;             }             if (!response.hasServer()) {                 throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost());             }
              ServiceInstance retrievedInstance = response.getServer();
              URI uri = exchange.getRequest().getURI();
              // if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,             // if the loadbalancer doesn't provide one.             String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";             if (schemePrefix != null) {                 overrideScheme = url.getScheme();             }
              DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance, overrideScheme);
              URI requestUrl = reconstructURI(serviceInstance, uri);
              if (log.isTraceEnabled()) {                 log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);             }             exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);         }).then(chain.filter(exchange));     }
      protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {         return LoadBalancerUriTools.reconstructURI(serviceInstance, original);     }
      @SuppressWarnings("deprecation")     private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) {         URI uri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);         ReactorLoadBalancer<ServiceInstance> loadBalancer = this.clientFactory.getInstance(uri.getHost(), ReactorServiceInstanceLoadBalancer.class);         if (loadBalancer == null) {             throw new NotFoundException("No loadbalancer available for " + uri.getHost());         }         return loadBalancer.choose(createRequest(exchange));     }
      /***      * 此处进行了改造 传入了内容 方便后续 LoadBalancer 处理信息      * @param exchange      * @return      */     @SuppressWarnings("deprecation")     private AdvanceRequestContext<ServerWebExchange> createRequest(ServerWebExchange exchange) {         return new AdvanceRequestContext(exchange);     }
  }
 
  
 | 
以上已经完成了灰度发布的必要部分,再进行一下AutoConfiguration 注意,这里不能被Spring 扫描
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
  
 | 
package top.lingma.gateway.loadbalancer;
  import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.cloud.client.ConditionalOnDiscoveryEnabled; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer; import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier; import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory; import org.springframework.context.annotation.Bean; import org.springframework.core.env.Environment;
  @ConditionalOnDiscoveryEnabled public class AdvanceLoadBalancerAutoConfiguration {     @Bean     @ConditionalOnMissingBean     public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {         String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);         return new AdvanceRoundRobinLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);     } }
  
 | 
最后 启动类配置 @LoadBalancerClients 的 defaultConfiguration
1 2 3 4 5 6 7 8 9 10 11
  
 | 
 @SpringBootApplication() @LoadBalancerClients(defaultConfiguration = AdvanceLoadBalancerAutoConfiguration.class) public class LingmaGatewayApplication {
      public static void main(String[] args) {         SpringApplication.run(LingmaGatewayApplication.class, args);     }
  }
 
  
 | 
关注公众号 [龗孖] 或搜索公众号[lingmaW] , 获得更多新干货!!!– 本文链接: https://blog.lingma.top/2022/12/01/36d5a1ed4a38/spring-cloud-gateway基于nacos如何去做灰度发布/index.html
- 
版权声明: 本博客所有文章除特别声明外,均采用 反996许可证版本1.0 许可协议。转载请注明出处!