Spring云流多集群连接:AdminClient和Consumer抛出错误:无法创建消费者绑定

问题描述 投票:0回答:1

问题:当我尝试为 Spring Cloud Stream Kafka 配置多集群连接时,我的应用程序为每个主题抛出错误:无法在 30 秒内创建消费者绑定重试 org.springframework.cloud.stream.provisioning.ProvisioningException:遇到配置异常主题

版本:Spring Boot 3.1.9 Spring Cloud 2022.0.5

基于官方github的应用配置文件:

spring:
  application.name: ${application.name}
  main.banner-mode: off
  cloud:
    function:
      definition: someFunctionalConsumer
    stream:
      default-binder: kafka1
      bindings:
        someFunctionalConsumer-in-0:
          destination: ${application.kafka.consumer.sourceTopicName},${application.kafka.consumer.retryTopicName}
          binder: kafka1
          group: ${CONSUMER_GROUP}
          content-type: application/*+avro
          consumer:
            concurrency: ${LISTENER_CONCURRENCY:1}
        stream-bridge-out:
          destination: ${application.kafka.producer.destinationTopicName}
          content-type: application/*+avro
          binder: kafka2
          producer:
            useNativeEncoding: true
      function:
        definition: someFunctionalConsumer
      binders:
        kafka1:
          type: kafka
          default-candidate: true
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: ${application.kafka.bootstrap-servers.consumer}
                      configuration:
                        bearer.auth.credentials.source: ${application.kafka.auth.bearer.auth.credentials.source}
                        bearer.auth.logical.cluster: ${application.kafka.auth.bearer.auth.logical.cluster}
                        bearer.auth.identity.pool.id: ${application.kafka.auth.bearer.auth.identity.pool.id}
                        security.protocol: ${application.kafka.security.protocol}
                        sasl:
                          # spring said that in consumer config this property does not use
                          oauthbearer.token.endpoint.url: ${OAUTHBEARER_TOKEN_ENDPOINT_URL}
                          jaas.config: ${application.kafka.auth.jaas.config}
                          mechanism: ${application.kafka.auth.mechanism}
                          login.callback.handler.class: ${application.kafka.auth.login.callback.handler.class}
        kafka2:
          type: kafka
          default-candidate: false
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: ${application.kafka.bootstrap-servers.producer}
                      configuration:
                        bearer.auth.credentials.source: ${application.kafka.auth.bearer.auth.credentials.source}
                        bearer.auth.logical.cluster: ${application.kafka.auth.bearer.auth.logical.cluster}
                        bearer.auth.identity.pool.id: ${application.kafka.auth.bearer.auth.identity.pool.id}
                        security.protocol: ${application.kafka.security.protocol}
                        sasl:
                          oauthbearer.token.endpoint.url: ${OAUTHBEARER_TOKEN_ENDPOINT_URL}
                          jaas.config: ${application.kafka.auth.jaas.config}
                          mechanism: ${application.kafka.auth.mechanism}
                          login.callback.handler.class: ${application.kafka.auth.login.callback.handler.class}
      kafka:
        bindings:
          someFunctionalConsumer-in-0:
            consumer:
              startOffset: ${START_OFFSET_POLICY:earliest}
              ack-mode: RECORD
              configuration:
                session.timeout.ms: 35000
                max.poll.records: ${MAX_POLL_RECORDS:1}
                auto.register.schemas: false
                specific.avro.reader: true
                key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
                spring.deserializer.key.delegate.class: com.common.dlq.client.serializer.RawDataDeserializerProxy
                dlq.client.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
                value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
                spring.deserializer.value.delegate.class: com.common.dlq.client.serializer.RawDataDeserializerProxy
                dlq.client.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
                spring.deserializer.value.function: com.nordstrom.orbit.fp.router.stream.FailedDeserializationFunction
                schema.registry.url: ${application.kafka.schema-registry.schema.registry.url}
                bearer.auth.credentials.source: ${application.kafka.auth.bearer.auth.credentials.source}
                bearer.auth.logical.cluster: ${application.kafka.auth.bearer.auth.logical.cluster}
                bearer.auth.identity.pool.id: ${application.kafka.auth.bearer.auth.identity.pool.id}
                security.protocol: ${application.kafka.security.protocol}
          stream-bridge-out:
            producer:
              sync: true
              recordMetadataChannel: meta
              configuration:
                delivery.timeout.ms: 300000
                acks: all
                retries: 5
                enable.idempotence: true
                auto.register.schemas: false
                avro.remove.java.properties: true
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                value.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
                useNativeEncoding: true
                schema.registry.url: ${application.kafka.schema-registry.schema.registry.url}

管理员客户端和消费者回退到默认配置,并且根本不考虑我提供的内容:

AdminClientConfig values:
        auto.include.jmx.reporter = true
        bootstrap.servers = [localhost:9092]
        client.dns.lookup = use_all_dns_ips
        client.id =
        connections.max.idle.ms = 300000
        default.api.timeout.ms = 60000
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.connect.timeout.ms = null
        sasl.login.read.timeout.ms = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.login.retry.backoff.max.ms = 10000
        sasl.login.retry.backoff.ms = 100
        sasl.mechanism = GSSAPI
        sasl.oauthbearer.clock.skew.seconds = 30
        sasl.oauthbearer.expected.audience = null
        sasl.oauthbearer.expected.issuer = null
        sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
        sasl.oauthbearer.jwks.endpoint.url = null
        sasl.oauthbearer.scope.claim.name = scope
        sasl.oauthbearer.sub.claim.name = sub
        sasl.oauthbearer.token.endpoint.url = null
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS

消费者:

ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.include.jmx.reporter = true
        auto.offset.reset = latest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = consumer-null-1
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = null
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.connect.timeout.ms = null
        sasl.login.read.timeout.ms = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.login.retry.backoff.max.ms = 10000
        sasl.login.retry.backoff.ms = 100
        sasl.mechanism = GSSAPI
        sasl.oauthbearer.clock.skew.seconds = 30
        sasl.oauthbearer.expected.audience = null
        sasl.oauthbearer.expected.issuer = null
        sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
        sasl.oauthbearer.jwks.endpoint.url = null
        sasl.oauthbearer.scope.claim.name = scope
        sasl.oauthbearer.sub.claim.name = sub
        sasl.oauthbearer.token.endpoint.url = null
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 45000
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

我调试不同的Spring Cloud配置bean: KafkaBinderConfigurationProperties 填充默认值 BindingServiceProperties 填充了信息但没有考虑到

创建了加载 BinderFactory 的调试方法:

void debugChannels() {
    final Binder<MessageChannel, ?, ?> binder1 =
        binderFactory.getBinder("kafka1", MessageChannel.class);
    final KafkaMessageChannelBinder kafka1 = (KafkaMessageChannelBinder) binder1;
    final DirectFieldAccessor directFieldAccessor1 = new DirectFieldAccessor(kafka1);
    final KafkaBinderConfigurationProperties configuration1 =
        (KafkaBinderConfigurationProperties)
            directFieldAccessor1.getPropertyValue("configurationProperties");

    log.debug("kafka consumer broker: {}", Arrays.toString(configuration1.getBrokers()));

    final Binder<MessageChannel, ?, ?> binder2 =
        binderFactory.getBinder("kafka2", MessageChannel.class);
    final KafkaMessageChannelBinder kafka2 = (KafkaMessageChannelBinder) binder2;
    final DirectFieldAccessor directFieldAccessor2 = new DirectFieldAccessor(kafka2);
    final KafkaBinderConfigurationProperties configuration2 =
        (KafkaBinderConfigurationProperties)
            directFieldAccessor2.getPropertyValue("configurationProperties");

    log.debug("kafka producer broker: {}", Arrays.toString(configuration2.getBrokers()));
  }

均返回:

kafka 生产者经纪人:[localhost]

spring spring-boot spring-cloud spring-cloud-stream
1个回答
0
投票

根本原因:

@Import(KafkaBinderConfiguration.class)
在自定义kafka健康检查bean中(我不知道为什么导入spring cloud配置中断收集属性) 但现在我有另一个问题:Spring Cloud 无法为streamBridge 创建绑定以及我在

中使用的自定义错误处理程序
@Bean
  ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(
      final StreamErrorHandler errorHandler) {
    return (container, dest, group) -> {
      container.setCommonErrorHandler(errorHandler);
      final ContainerProperties properties = container.getContainerProperties();
      properties.setAckMode(AckMode.RECORD);
    };
  }

坏了

© www.soinside.com 2019 - 2024. All rights reserved.