在Spring Boot中使用EmbeddedKafka测试Kafka消费者@KafkaListener

问题描述 投票:0回答:1

我想测试我的 kafka 消费者,但 @EmbddedKafka 有问题。

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaEventConsumer {

    private final CustomInterface customInterface;

    @KafkaListener(topics = "test-topic")
    public void consumeEvents(Event event) {
           customInterface.apply(event);
        }
    }
}

我的测试课如下

@EmbeddedKafka
@ExtendWith(MockitoExtension.class)
@SpringBootTest(properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KafkaConsumerTest {

    private Producer<String, String> producer;

    private static File EVENT_JSON = Paths.get("src", "test", "resources", "files",
                                                                "event-file.json").toFile();

    private KafkaEventConsumer kafkaEventConsumer;

    @Mock
    private CustomInterface CustomInterface;


    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Autowired
    private ObjectMapper objectMapper;

  

    @BeforeAll
    void setUp() {
        kafkaEventConsumer = new KafkaEventConsumer(customInterface);
        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(),
                                                     new StringSerializer()).createProducer();
    }

    @Test
    void consumeEvents() throws IOException, BadCurrencyException {
        var event = objectMapper.readValue(EVENT_JSON,Event.class);
        String message = objectMapper.writeValueAsString(event);
        producer.send(new ProducerRecord<>("test-topic", 0, "1", message));
        producer.flush();

        // Read the message and assert its properties
        verify(customeInterface, timeout(10000).times(1)).apply(any());
    }

    @AfterAll
    void shutdown() {
        producer.close();
    }

}

测试未通过,消费者没有拦截消息

Wanted but not invoked:
customInterface.apply(
     <any> );
Actually, there were zero interactions with this mock.

PS:我关注了这篇有趣的文章

spring-boot mockito spring-kafka
1个回答
1
投票

我用过

KafkaTemplate

@SpringBootTest
@EmbeddedKafka(brokerProperties = {"listeners=PLAINTEXT://localhost:9092"},
               partitions = 1,
               controlledShutdown = true)
class KafkaConsumerTest {

    private static File EVENT_JSON = Paths.get("src", "test", "resources", "files",
                                                                "event-file.json").toFile();
    @Autowired
    KafkaTemplate<String, Event> kafkaTemplate;

    @Autowired
    private ObjectMapper objectMapper;


    @SpyBean
    private KafkaEvenConsumer kafkaEvenConsumer;

    @SpyBean
    private MyInterface myInterface;

    @Captor
    ArgumentCaptor<Event> eventCaptor;

    @Test
    @SneakyThrows
    @DirtiesContext
    void consumeEvents() {

        Event event = objectMapper.readValue(EVENT_JSON,                                                Event.class);
        kafkaTemplate.send("test-topic, "1", event);

        verify(kafkaEventConsumer,timeout(10000).times(1)).consumeEvents(eventCaptor.capture());
        Event argument = eventCaptor.getValue();
        // .. assert the message properties
        verify(myInterface, timeout(10000).times(1)).apply(any());
       
    }

}

© www.soinside.com 2019 - 2024. All rights reserved.