简单的汇合kafka spring boot应用程序不起作用,似乎没有读取application.properties

问题描述 投票:0回答:1

我通过汇合站点创建了一个汇合的 kafka 代理,并通过 CLI 创建了一个主题。 然后我通过initializr和kafka lib创建了一个简单的spring boot项目。 然后使用 confluence 提供的示例代码创建简单的生产者和消费者类:

https://github.com/johnvlittle/kafka1

但是,这不起作用。它似乎没有读取 application.properties 文件,因为它似乎正在尝试连接到本地主机而不是引导服务器:

2024-02-24T21:46:05.870+01:00  INFO 97874 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Instantiated an idempotent producer.
2024-02-24T21:46:05.886+01:00  INFO 97874 --- [           main] o.a.k.c.s.authenticator.AbstractLogin    : Successfully logged in.
2024-02-24T21:46:05.940+01:00  INFO 97874 --- [           main] o.a.k.clients.producer.ProducerConfig    : These configurations '[session.timeout.ms]' were supplied but are not used yet.
2024-02-24T21:46:05.941+01:00  INFO 97874 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.6.1
2024-02-24T21:46:05.942+01:00  INFO 97874 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 5e3c2b738d253ff5
2024-02-24T21:46:05.942+01:00  INFO 97874 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1708807565941
2024-02-24T21:46:06.034+01:00  INFO 97874 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Node -1 disconnected.
2024-02-24T21:46:06.034+01:00  WARN 97874 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2024-02-24T21:46:06.035+01:00  WARN 97874 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
2024-02-24T21:46:06.140+01:00  INFO 97874 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Node -1 disconnected.

application.properties 似乎是一个问题:

# Required connection configs for Kafka producer, consumer, and admin
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.bootstrap.servers=pkc-xxx.eu-north-1.aws.confluent.cloud:9092
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='6IJSJXExxx' password='Fvfu4YcYhPJbULws0Usxxx';
spring.kafka.properties.security.protocol=SASL_SSL

# Best practice for higher availability in Apache Kafka clients prior to 3.0
spring.kafka.properties.session.timeout.ms=45000

如果我获取凭据和引导服务器,并将它们放入“offset explorer 应用程序”中,我可以连接到 kafka 代理,并查看主题(还没有消息)。

spring.kafka.properties 似乎没有记录,但我找到了引导服务器的替代形式:

spring.kafka.bootstrap-servers=pkc-xxx.eu-north-1.aws.confluent.cloud:9092

使用这种替代形式,应用程序可以工作更多,但仍然无法生成任何消息,并且从 ./mvnw spring-boot:run 得到此输出

2024-02-24T22:17:31.587+01:00  INFO 98914 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.6.1
2024-02-24T22:17:31.587+01:00  INFO 98914 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 5e3c2b738d253ff5
2024-02-24T22:17:31.587+01:00  INFO 98914 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1708809451587
2024-02-24T22:17:31.588+01:00  INFO 98914 --- [           main] fkaConsumerFactory$ExtendedKafkaConsumer : [Consumer clientId=consumer-springboot-group-1-1, groupId=springboot-group-1] Subscribed to topic(s): bets
2024-02-24T22:17:32.207+01:00  INFO 98914 --- [ad | producer-1] com.ohds.kafka1.Producer                 : 

 Produced event to topic bets: key = key        value = value 


2024-02-24T22:17:32.268+01:00  INFO 98914 --- [yConsumer-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-springboot-group-1-1, groupId=springboot-group-1] Cluster ID: lkc-219x1y
2024-02-24T22:17:32.269+01:00  INFO 98914 --- [yConsumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-springboot-group-1-1, groupId=springboot-group-1] Discovered group coordinator b9-pkc-zm3p0.eu-north-1.aws.confluent.cloud:9092 (id: 2147483638 rack: null)
2024-02-24T22:17:32.327+01:00  INFO 98914 --- [yConsumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-springboot-group-1-1, groupId=springboot-group-1] (Re-)joining group
2024-02-24T22:17:32.986+01:00  INFO 98914 --- [yConsumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-springboot-group-1-1, groupId=springboot-group-1] Request joining group due to: need to re-join with the given member-id: consumer-springboot-group-1-1-f3f727a2-6ba0-4e97-8279-628ce8df8f35
2024-02-24T22:17:32.987+01:00  INFO 98914 --- [yConsumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-springboot-group-1-1, groupId=springboot-group-1] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)
2024-02-24T22:17:32.988+01:00  INFO 98914 --- [yConsumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-springboot-group-1-1, groupId=springboot-group-1] (Re-)joining group
2024-02-24T22:17:33.057+01:00  INFO 98914 --- [yConsumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-springboot-group-1-1, groupId=springboot-group-1] Successfully joined group with generation Generation{generationId=3, memberId='consumer-springboot-group-1-1-f3f727a2-6ba0-4e97-8279-628ce8df8f35', protocol='range'}
2024-02-24T22:17:33.064+01:00  INFO 98914 --- [yConsumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-springboot-group-1-1, groupId=springboot-group-1] Finished assignment for group at generation 3: {consumer-springboot-group-1-1-f3f727a2-6ba0-4e97-8279-628ce8df8f35=Assignment(partitions=[bets-0, bets-1, bets-2, bets-3, bets-4, bets-5])}
2024-02-24T22:17:33.136+01:00  INFO 98914 --- [yConsumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-springboot-group-1-1, groupId=springboot-group-1] Successfully synced group in generation Generation{generationId=3, memberId='consumer-springboot-group-1-1-f3f727a2-6ba0-4e97-8279-628ce8df8f35', protocol='range'}
2024-02-24T22:17:33.136+01:00  INFO 98914 --- [yConsumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-springboot-group-1-1, groupId=springboot-group-1] Notifying assignor about the new Assignment(partitions=[bets-0, bets-1, bets-2, bets-3, bets-4, bets-5])
2024-02-24T22:17:33.137+01:00  INFO 98914 --- [yConsumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-springboot-group-1-1, groupId=springboot-group-1] Adding newly assigned partitions: bets-0, bets-1, bets-2, bets-3, bets-4, bets-5

所以看起来kafka的application.properties键是错误的,但它们是由confluence生成的,而confluence是创建kafka的公司,所以他们应该知道自己在做什么?

我找不到以下看似不起作用的“属性”属性的“正确”等效项:

spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=xxx
spring.kafka.properties.security.protocol=SASL_SSL

有人知道正确的配置键吗?

java spring-boot apache-kafka spring-kafka
1个回答
0
投票

解决方案是通过删除键的 .properties 部分来更改应用程序属性。奇怪的是 confluence 会提供错误的设置,但这是 application.properties 的工作版本:

spring.kafka.bootstrap-servers=pkc-xxx.eu-north-1.aws.confluent.cloud:9092
spring.kafka.sasl.mechanism=PLAIN
spring.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='6IJSJXxxx' password='Fvfu4YcYhPJbxxxx';
spring.kafka.security.protocol=SASL_SSL
© www.soinside.com 2019 - 2024. All rights reserved.