我正在尝试将 Testcontainers 集成到我的 Spring Boot 应用程序中。 我正在使用 Kafka 容器来发送 Kafka 消息。 主要问题是,当我正确向 kafka 发送消息时,我想知道应该采用什么方法来验证消息是否正确发送。 因为我不想在我的服务中包含消费者部分,其中只有生产者。
我不是在嘲笑生产者接口,所以我无法验证发送方法是否被调用。 先谢谢你了
您可以简单地在单元测试中创建一个
KafkaTemplate
并使用其 receive()
API:https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/template-receive.html 。必须提供 ConsumerFactory
到 KafkaTemplate
:
/**
* Set a consumer factory for receive operations.
* @param consumerFactory the consumer factory.
* @since 2.8
*/
public void setConsumerFactory(ConsumerFactory<K, V> consumerFactory) {
另一种选择是使用
KafkaTestUtils.getOneRecord()
:
/**
* Get a single record for the group from the topic/partition. Optionally, seeking to the current last record.
* @param brokerAddresses the broker address(es).
* @param group the group.
* @param topic the topic.
* @param partition the partition.
* @param seekToLast true to fetch an existing last record, if present.
* @param commit commit offset after polling or not.
* @param timeout the timeout.
* @return the record or null if no record received.
* @since 2.9.3
*/
@Nullable
@SuppressWarnings({ "rawtypes", "unchecked" })
public static ConsumerRecord<?, ?> getOneRecord(String brokerAddresses, String group, String topic, int partition,
boolean seekToLast, boolean commit, Duration timeout) {
但是,听起来您并没有在项目中使用
spring-kafka-test
模块。