我的 Spring Boot 应用程序(版本 3.2.1)和嵌入式 Kafka 面临问题。问题是KafkaTemplate没有成功发送消息
这是我的测试代码
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
@ExtendWith(MockitoExtension.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }, controlledShutdown = true)
public class PaymentITest {
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Value("${demo.kafka.topic.payment}")
private String topicName;
@Test
void test() {
String jsonPayload = "{ \"payload\": { \"payment_id\": \"1\" } }";
kafkaTemplate.send(topicName, "key", jsonPayload);
}
}
**卡夫卡消费者**
import fr.test.test.compute.common.event.PaymentEvent;
import fr.test.test.compute.core.domain.port.api.PaymentRequester;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaPaymentConsumer {
private final PaymentRequester paymentRequester;
@KafkaListener(topics = "#{'${demo.kafka.topic.payment}'}",
groupId = "#{'${demo.kafka.group-id}'}")
public void consumePaymentEvents(PaymentEvent paymentEvent) {
paymentRequester.handlePaymentReceivedEvent(paymentEvent.extractModel());
}
}
测试/资源/application.yml
spring:
mongodb:
embedded:
storage:
oplogSize: 10
repl-set-name: rs0
version: "5.0.5"
kafka:
consumer:
bootstrap-servers: localhost:9092
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
group-id: kafka-group-id
auto-offset-reset: earliest
enable-auto-commit: false
properties:
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: fr.test.test.compute.config.kafka.KafkaEventDeserializer
producer:
bootstrap.servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: fr.test.test.compute.config.kafka.KafkaEventSerializer
de:
flapdoodle:
mongodb:
embedded:
version: 4.0.2
demo:
kafka:
topic:
payment: payment_topic2
group-id: kafka-group
我已经与当地的 kafka 经纪人进行了测试,消费者工作得很好!
但是在运行测试时,我在消费者中添加了一个断点,而最后一个没有被击中!是什么导致 KafkaTemplate 不发送消息(如下图所示)?是不是SpringBoot 3中嵌入Kafka相关的问题
这里是代码库的 Github 存储库📁https://github.com/smaillns/springboot-mongo-kafka
如有任何建议,我们将不胜感激?
java.util.concurrent.CompletableFuture@bfc918c[Not completed]
您是否尝试过在测试资源
bootstrap-servers: localhost:9092
中注释掉application.yml
?我也遇到过类似的问题,这对我有帮助,否则我的应用程序会尝试在 Docker 中调用我的 Kafka。