Spring Boot3中无法使用EmbeddedKafka发送Kafka消息

问题描述 投票:0回答:1

我的 Spring Boot 应用程序(版本 3.2.1)和嵌入式 Kafka 面临问题。问题是KafkaTemplate没有成功发送消息

这是我的测试代码


import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;

@ExtendWith(MockitoExtension.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }, controlledShutdown = true)
public class PaymentITest {

    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Value("${demo.kafka.topic.payment}")
    private String topicName;

    @Test
    void test() {
        String jsonPayload = "{ \"payload\": { \"payment_id\": \"1\" } }";

        kafkaTemplate.send(topicName, "key", jsonPayload);
    }
}

**卡夫卡消费者**

import fr.test.test.compute.common.event.PaymentEvent;
import fr.test.test.compute.core.domain.port.api.PaymentRequester;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaPaymentConsumer {

    private final PaymentRequester paymentRequester;

    @KafkaListener(topics = "#{'${demo.kafka.topic.payment}'}",
                   groupId = "#{'${demo.kafka.group-id}'}")
    public void consumePaymentEvents(PaymentEvent paymentEvent)  {
        paymentRequester.handlePaymentReceivedEvent(paymentEvent.extractModel());
    }

}

测试/资源/application.yml

spring:
  mongodb:
    embedded:
      storage:
        oplogSize: 10
        repl-set-name: rs0
      version: "5.0.5"
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      group-id: kafka-group-id
      auto-offset-reset: earliest
      enable-auto-commit: false
      properties:
        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
        spring.deserializer.value.delegate.class: fr.test.test.compute.config.kafka.KafkaEventDeserializer
    producer:
      bootstrap.servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: fr.test.test.compute.config.kafka.KafkaEventSerializer

de:
  flapdoodle:
    mongodb:
      embedded:
        version: 4.0.2

demo:
  kafka:
    topic:
      payment: payment_topic2
    group-id: kafka-group

我已经与当地的 kafka 经纪人进行了测试,消费者工作得很好!

但是在运行测试时,我在消费者中添加了一个断点,而最后一个没有被击中!是什么导致 KafkaTemplate 不发送消息(如下图所示)?是不是SpringBoot 3中嵌入Kafka相关的问题

这里是代码库的 Github 存储库📁https://github.com/smaillns/springboot-mongo-kafka

如有任何建议,我们将不胜感激?

java.util.concurrent.CompletableFuture@bfc918c[Not completed]
spring-boot spring-kafka spring-boot-3 spring-kafka-test
1个回答
0
投票

您是否尝试过在测试资源

bootstrap-servers: localhost:9092
中注释掉
application.yml
?我也遇到过类似的问题,这对我有帮助,否则我的应用程序会尝试在 Docker 中调用我的 Kafka。

© www.soinside.com 2019 - 2024. All rights reserved.