问题:当我尝试为 Spring Cloud Stream Kafka 配置多集群连接时,我的应用程序为每个主题抛出错误:无法在 30 秒内创建消费者绑定重试 org.springframework.cloud.stream.provisioning.ProvisioningException:遇到配置异常主题
版本:Spring Boot 3.1.9 Spring Cloud 2022.0.5
基于官方github的应用配置文件:
spring:
application.name: ${application.name}
main.banner-mode: off
cloud:
function:
definition: someFunctionalConsumer
stream:
default-binder: kafka1
bindings:
someFunctionalConsumer-in-0:
destination: ${application.kafka.consumer.sourceTopicName},${application.kafka.consumer.retryTopicName}
binder: kafka1
group: ${CONSUMER_GROUP}
content-type: application/*+avro
consumer:
concurrency: ${LISTENER_CONCURRENCY:1}
stream-bridge-out:
destination: ${application.kafka.producer.destinationTopicName}
content-type: application/*+avro
binder: kafka2
producer:
useNativeEncoding: true
function:
definition: someFunctionalConsumer
binders:
kafka1:
type: kafka
default-candidate: true
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: ${application.kafka.bootstrap-servers.consumer}
configuration:
bearer.auth.credentials.source: ${application.kafka.auth.bearer.auth.credentials.source}
bearer.auth.logical.cluster: ${application.kafka.auth.bearer.auth.logical.cluster}
bearer.auth.identity.pool.id: ${application.kafka.auth.bearer.auth.identity.pool.id}
security.protocol: ${application.kafka.security.protocol}
sasl:
# spring said that in consumer config this property does not use
oauthbearer.token.endpoint.url: ${OAUTHBEARER_TOKEN_ENDPOINT_URL}
jaas.config: ${application.kafka.auth.jaas.config}
mechanism: ${application.kafka.auth.mechanism}
login.callback.handler.class: ${application.kafka.auth.login.callback.handler.class}
kafka2:
type: kafka
default-candidate: false
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: ${application.kafka.bootstrap-servers.producer}
configuration:
bearer.auth.credentials.source: ${application.kafka.auth.bearer.auth.credentials.source}
bearer.auth.logical.cluster: ${application.kafka.auth.bearer.auth.logical.cluster}
bearer.auth.identity.pool.id: ${application.kafka.auth.bearer.auth.identity.pool.id}
security.protocol: ${application.kafka.security.protocol}
sasl:
oauthbearer.token.endpoint.url: ${OAUTHBEARER_TOKEN_ENDPOINT_URL}
jaas.config: ${application.kafka.auth.jaas.config}
mechanism: ${application.kafka.auth.mechanism}
login.callback.handler.class: ${application.kafka.auth.login.callback.handler.class}
kafka:
bindings:
someFunctionalConsumer-in-0:
consumer:
startOffset: ${START_OFFSET_POLICY:earliest}
ack-mode: RECORD
configuration:
session.timeout.ms: 35000
max.poll.records: ${MAX_POLL_RECORDS:1}
auto.register.schemas: false
specific.avro.reader: true
key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.key.delegate.class: com.common.dlq.client.serializer.RawDataDeserializerProxy
dlq.client.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: com.common.dlq.client.serializer.RawDataDeserializerProxy
dlq.client.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.deserializer.value.function: com.nordstrom.orbit.fp.router.stream.FailedDeserializationFunction
schema.registry.url: ${application.kafka.schema-registry.schema.registry.url}
bearer.auth.credentials.source: ${application.kafka.auth.bearer.auth.credentials.source}
bearer.auth.logical.cluster: ${application.kafka.auth.bearer.auth.logical.cluster}
bearer.auth.identity.pool.id: ${application.kafka.auth.bearer.auth.identity.pool.id}
security.protocol: ${application.kafka.security.protocol}
stream-bridge-out:
producer:
sync: true
recordMetadataChannel: meta
configuration:
delivery.timeout.ms: 300000
acks: all
retries: 5
enable.idempotence: true
auto.register.schemas: false
avro.remove.java.properties: true
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
useNativeEncoding: true
schema.registry.url: ${application.kafka.schema-registry.schema.registry.url}
管理员客户端和消费者回退到默认配置,并且根本不考虑我提供的内容:
AdminClientConfig values:
auto.include.jmx.reporter = true
bootstrap.servers = [localhost:9092]
client.dns.lookup = use_all_dns_ips
client.id =
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
消费者:
ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.include.jmx.reporter = true
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-null-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = null
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
我调试不同的Spring Cloud配置bean: KafkaBinderConfigurationProperties 填充默认值 BindingServiceProperties 填充了信息但没有考虑到
创建了加载 BinderFactory 的调试方法:
void debugChannels() {
final Binder<MessageChannel, ?, ?> binder1 =
binderFactory.getBinder("kafka1", MessageChannel.class);
final KafkaMessageChannelBinder kafka1 = (KafkaMessageChannelBinder) binder1;
final DirectFieldAccessor directFieldAccessor1 = new DirectFieldAccessor(kafka1);
final KafkaBinderConfigurationProperties configuration1 =
(KafkaBinderConfigurationProperties)
directFieldAccessor1.getPropertyValue("configurationProperties");
log.debug("kafka consumer broker: {}", Arrays.toString(configuration1.getBrokers()));
final Binder<MessageChannel, ?, ?> binder2 =
binderFactory.getBinder("kafka2", MessageChannel.class);
final KafkaMessageChannelBinder kafka2 = (KafkaMessageChannelBinder) binder2;
final DirectFieldAccessor directFieldAccessor2 = new DirectFieldAccessor(kafka2);
final KafkaBinderConfigurationProperties configuration2 =
(KafkaBinderConfigurationProperties)
directFieldAccessor2.getPropertyValue("configurationProperties");
log.debug("kafka producer broker: {}", Arrays.toString(configuration2.getBrokers()));
}
均返回:
kafka 生产者经纪人:[localhost]
根本原因:
@Import(KafkaBinderConfiguration.class)
在自定义kafka健康检查bean中(我不知道为什么导入spring cloud配置中断收集属性)
但现在我有另一个问题:Spring Cloud 无法为streamBridge 创建绑定以及我在中使用的自定义错误处理程序
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(
final StreamErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setCommonErrorHandler(errorHandler);
final ContainerProperties properties = container.getContainerProperties();
properties.setAckMode(AckMode.RECORD);
};
}
坏了