Spring Cloud Dataflow kafka-source-kafka 抛出错误

问题描述 投票:0回答:1

我正在尝试使用 SCDF kafka-source-kafka 作为 kafka 供应商配置创建一个 Steam 应用程序。以下是将外部源 kafka 配置为消费者配置的属性。

app.kafka-source-kafka.kafka.supplier.topics=TEST_TOPIC
app.kafka-source-kafka.spring.kafka.consumer.bootstrap-servers=xxxxxxxx
app.kafka-source-kafka.spring.kafka.consumer.enable-auto-commit=true
app.kafka-source-kafka.spring.kafka.consumer.group-id=B2B_EH_PASTHROUGH_FALLBACK
app.kafka-source-kafka.spring.kafka.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxxxx\" password=\"xxxxxx\";
app.kafka-source-kafka.spring.kafka.consumer.properties.sasl.mechanism=PLAIN
app.kafka-source-kafka.spring.kafka.consumer.properties.security.protocol=SASL_SSL
app.kafka-source-kafka.spring.kafka.consumer.properties.ssl.enabled.protocols=TLSv1.2
app.kafka-source-kafka.spring.kafka.consumer.properties.ssl.truststore.location=/etc/secrets/kafka.client.truststore.jks
app.kafka-source-kafka.spring.kafka.consumer.properties.ssl.truststore.password=xxxx
app.kafka-source-kafka.spring.kafka.consumer.properties.ssl.truststore.type=JKS
app.kafka-source-kafka.spring.kafka.listener.async-acks=true


Logs:
2024-01-04T05:45:27.711Z  INFO 1 --- [main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.include.jmx.reporter = true
    auto.offset.reset = latest
    bootstrap.servers = [xxxxx:9090]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-B2B_EH_PASTHROUGH_FALLBACK-1
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = B2B_EH_PASTHROUGH_FALLBACK

After few minuits----------->

"clients.consumer.ConsumerConfig" values got changed and trying to connect internal kafka brokers.
2024-01-04T05:46:45.754Z  INFO 1 --- [binder-health-1] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.include.jmx.reporter = true
    auto.offset.reset = latest
    bootstrap.servers = [10.100.172.208:9092]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips

错误: 与节点 -1 (my-release-kafka.springdata.svc.cluster.local/10.100.172.208:9092) 的连接在身份验证期间终止。发生这种情况可能是由于以下任一原因: (1) 身份验证失败

不确定为什么消费者/供应商引导服务器值在成功加入外部源 kafka 组后发生了变化。

登录成功。 2024-01-04T05:45:28.226Z INFO 1 --- [主要] o.a.kafka.common.utils.AppInfoParser:Kafka版本:3.4.1 2024-01-04T05:45:28.226Z信息1 --- [主要] o.a.kafka.common.utils.AppInfoParser:Kafka commitId:8a516edc2755df89 2024-01-04T05:45:28.226Z信息1 --- [主要] o.a.kafka.common.utils.AppInfoParser:Kafka startTimeMs:1704347128225 2024-01-04T05:45:28.318Z INFO 1 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [消费者 clientId=consumer-B2B_EH_PASTHROUGH_FALLBACK-1, groupId=B2B_EH_PASTHROUGH_FALLBACK] 订阅主题:EIS.TOPIC。密码输入 2024-01-04T05:45:28.409Z INFO 1 --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter :启动 bean 'kafkaMessageDrivenChannelAdapterSpec';定义于:'类路径资源[org/springframework/cloud/fn/supplier/kafka/KafkaSupplierConfiguration.class]';来自来源:'org.springframework.cloud.fn.supplier.kafka.KafkaSupplierConfiguration.kafkaMessageDrivenChannelAdapterSpec(org.springframework.cloud.fn.supplier.kafka.KafkaSupplierProperties,org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory,org.springframework.beans。工厂.ObjectProvider,org.springframework.beans.factory.ObjectProvider,org.springframework.beans.factory.ObjectProvider,org.springframework.cloud.fn.common.config.ComponentCustomizer)' 2024-01-04T05:45:28.521Z INFO 1 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer :Tomcat 在端口上启动:8080 (http),上下文路径为 '' 2024-01-04T05:45:28.715Z INFO 1 --- [ main] .c.s.a.k.s.k.KafkaSourceKafkaApplication :在 31.902 秒内启动 KafkaSourceKafkaApplication (进程运行了 34.939)

spring-cloud-dataflow
1个回答
0
投票

binder-health
表示
KafkaBinderHealthIndicator
的螺纹。每次调用
o.a.k.clients.consumer.ConsumerConfig
时,我们都会看到那些
consumerFactory.createConsumer()
KafkaBinderHealthIndicator
中确实有一个元数据调用。

所以,它没有改变,而是一个新的

KafkaConsumer
实例。

它是由

kafkaBinderHealthIndicator
bean 定义创建的:

    if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                configurationProperties.getKafkaConnectionString());
    }
    ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
    KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(
            kafkaMessageChannelBinder, consumerFactory);

我不确定为什么健康指标的代理配置在所有这些逻辑之后有所不同,但是如果您有一些简单的 Spring Cloud Stream 应用程序需要重现,请随时提出 Spring Cloud Stream 的 GH 问题,以便进一步调查我们的侧面。

与 Spring Cloud Dataflow 无关。

© www.soinside.com 2019 - 2024. All rights reserved.