验证@KafkaListener在Kafka生产者发送到Spock后是否被调用

问题描述 投票:0回答:1

我有一个

Spring
消费者,其
@KafkaListener

@Service
@Slf4j
public class EventListener {

    @KafkaListener(topics = "topic", groupId = "group-id")
    public void consumer(TheEvent event) {
        log.info("Received the event in {}", event.toString());
    }
}

为了测试,我们使用

SpockFramework
,我想验证消费者是否收到了发布的消息。在
Spock
文档中,我发现
PollingConditions
然而,它似乎不适用于变化。

@EmbeddedKafka
class KafkaSpec extend Specification {
    @Autowired
    Producer<String, TheEvent> theEventProducer
    
    @SpringSpy
    EventListener eventListener

    def "produce and consume event successfully"() {
        given: "An event"
        var event = MockEventFactory.createEvent()

        when: "A producer sends the event"
        this.theEventProducer.send(new ProducerRecord<String, TheEvent>("topic", "123", event))

        then: "Consumer reads event eventually"
        new PollingConditions(timeout: 10).eventually {
            1 * eventListener.consumer(event)
        }
    }
}

问题是,第一次迭代检查条件后的

PollingCondition
将失败并出现以下错误,并将等待消费者!

Too few invocations for:

1 * eventListener.consumer(event)   (0 invocations)

Unmatched invocations (ordered by similarity):

在这一步之后,我想检查系统功能并测试其他部分逻辑是否已正确执行,但是,首先我需要等待接收事件。

java groovy kafka-consumer-api spock spock-spy
1个回答
0
投票

您无法在

PollingConditions
块中执行模拟断言。您需要使用
CountDownLatch
或类似的东西。请参阅https://stackoverflow.com/a/52935749/2145769

    def "produce and consume event successfully"() {
        given: "An event"
        var event = MockEventFactory.createEvent()
        var latch = new CountDownLatch(1)

        when: "A producer sends the event"
        this.theEventProducer.send(new ProducerRecord<String, TheEvent>("topic", "123", event))

        and:
        latch.await(10, TimeUnit.SECONDS) // must be in when-block, use timeout variant

        then: "Consumer reads event eventually"
        1 * eventListener.consumer(event) >> {
            latch.countDown()
        }        
    }
© www.soinside.com 2019 - 2024. All rights reserved.