将 WebClient 与 Consul 一起使用时,服务发现不起作用

问题描述 投票:0回答:1

我们正在将 Spring Boot 从 2 迁移到 3,并摆脱 Netflix Ribbon。 我们在尝试使用 Consul 发现服务时遇到问题。 如果我们回滚到 Spring Boot 2 + Netflix Ribbon,它可以正常工作,因此我们放弃了任何连接问题。

记录错误:

RoundRobinLoadBalancer|No servers available for service: cachedavailability-integrations-service
ReactorLoadBalancerExchangeFilterFunction|LoadBalancer does not contain an instance for the service cachedavailability-integrations-service
Communication error with uri: http://cachedavailability-integrations-service/testing org.springframework.web.reactive.function.client.WebClientResponseException$ServiceUnavailable: 503 Service Unavailable from UNKNOWN 
    at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:336)
    Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below: 
Error has been observed at the following site(s):
    *__checkpoint ⇢ 503 SERVICE_UNAVAILABLE from GET http://cachedavailability-integrations-service/testing [DefaultWebClient]

领事界面 image

  1. “si-manager”服务试图发现并访问“cachedavailability”服务。
  2. 它注册得很好,但在使用WebClient bean时没有发现注册的其他服务。

我们已经尝试过很多方法,例如:

  1. https://docs.spring.io/spring-cloud-consul/docs/current/reference/html/#using-the-discoveryclient
  2. 使用 spring webflux WebClient 进行服务发现
  3. 在没有自动配置的情况下配置 spring-cloud 负载均衡器

样品

主课

@org.springframework.cloud.client.discovery.EnableDiscoveryClient
public class MainApplication {...}

WebClient 配置

  @Bean(name = "webClientConsulAvailability")
  public WebClient webClientConsulAvailability(
    WebClient.Builder webClientBuilder,
    ReactorLoadBalancerExchangeFilterFunction lbFunction,
    ExchangeFilterFunction logFilter
  ) {
    return webClientBuilder
      .filter(lbFunction)
      .filter(logFilter)
      .build();
  }

bootstrap.yml

spring:
  application:
    name: si-manager-service
  profiles:
    active: ${SPRING_PROFILES_ACTIVE:local}
  cloud:
    consul:
      host: localhost
      port: 8500
      enabled: true
      discovery:
        serviceName: ${spring.application.name}
        instanceId: ${spring.application.name}8500
        enabled: true
        # Register as a service in consul.
        register: true
        registerHealthCheck: true

依赖关系 image

Consul版本:v1.15.3

使用示例:

webClientConsulAvailability.get()
      .uri("http://cachedavailability-integrations-service/testing")
      .retrieve()
      .bodyToFlux(MyDTO.class)
      .doOnError(e -> {
        if (isErrorLogLevel(e)) {
          log.error(COMMUNICATION_ERROR_WITH_URI + uri, e);
        } else {
          log.warn(COMMUNICATION_ERROR_WITH_URI + uri, e);
        }
      })
      .onErrorResume(e -> Flux.empty());
java spring-boot spring-cloud consul spring-cloud-consul
1个回答
1
投票

用下面的代码修复。

package xpto;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.reactive.DeferringLoadBalancerExchangeFilterFunction;
import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancedExchangeFilterFunction;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.netty.http.client.HttpClient;

import java.util.concurrent.TimeUnit;

@Configuration
@EnableDiscoveryClient
@Slf4j
public class WebclientConfiguration {

  private final ObjectMapper objectMapper;
  @Value("${web.client.read-timeout:25000}")
  private final int webClientReadTimeout;
  @Value("${web.client.connection-timeout:3000}")
  private final int webClientConnectionTimeout;

  public WebclientConfiguration(ObjectMapper objectMapper,
    @Value("${web.client.read-timeout:25000}") int webClientReadTimeout,
    @Value("${web.client.connection-timeout:3000}") int webClientConnectionTimeout) {
    this.objectMapper = objectMapper;
    this.webClientReadTimeout = webClientReadTimeout;
    this.webClientConnectionTimeout = webClientConnectionTimeout;
  }

  private ClientHttpConnector getClientHttpConnector() {
    return new ReactorClientHttpConnector(
      HttpClient.create().compress(true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, webClientConnectionTimeout)
        .doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(webClientReadTimeout, TimeUnit.MILLISECONDS))));
  }

  @Bean
  public DefaultUriBuilderFactory builderFactory() {
    DefaultUriBuilderFactory factory = new DefaultUriBuilderFactory();
    factory.setEncodingMode(DefaultUriBuilderFactory.EncodingMode.VALUES_ONLY);
    return factory;
  }

  @Bean(name = "webClientConsul")
  public WebClient webClientConsul(
    WebClient.Builder webClientBuilder,
    DeferringLoadBalancerExchangeFilterFunction<LoadBalancedExchangeFilterFunction> exchangeFilterFunction
  ) {
    webClientBuilder.filter(exchangeFilterFunction);
    return buildWebClient(webClientBuilder);
  }

  @Bean(name = "webClientDefault")
  public WebClient webClientDefault(WebClient.Builder webClientBuilder) {
    return buildWebClient(webClientBuilder);
  }

  private WebClient buildWebClient(WebClient.Builder webClientBuilder) {
    ClientHttpConnector connector = getClientHttpConnector();
    return webClientBuilder
      .clientConnector(connector)
      .exchangeStrategies(getExchangeStrategies())
      .build();
  }

  private ExchangeStrategies getExchangeStrategies() {
    return  ExchangeStrategies.builder()
      .codecs(clientDefaultCodecsConfigurer -> {
        clientDefaultCodecsConfigurer
          .defaultCodecs()
          .jackson2JsonEncoder(
            new Jackson2JsonEncoder(objectMapper, MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_STREAM_JSON));
        clientDefaultCodecsConfigurer
          .defaultCodecs()
          .jackson2JsonDecoder(
            new Jackson2JsonDecoder(objectMapper, MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_STREAM_JSON));
      }).build();
  }
  
}
© www.soinside.com 2019 - 2024. All rights reserved.