我有一个
Spring
消费者,其 @KafkaListener
:
@Service
@Slf4j
public class EventListener {
@KafkaListener(topics = "topic", groupId = "group-id")
public void consumer(TheEvent event) {
log.info("Received the event in {}", event.toString());
}
}
为了测试,我们使用
SpockFramework
,我想验证消费者是否收到了发布的消息。在 Spock
文档中,我发现 PollingConditions
然而,它似乎不适用于变化。
@EmbeddedKafka
class KafkaSpec extend Specification {
@Autowired
Producer<String, TheEvent> theEventProducer
@SpringSpy
EventListener eventListener
def "produce and consume event successfully"() {
given: "An event"
var event = MockEventFactory.createEvent()
when: "A producer sends the event"
this.theEventProducer.send(new ProducerRecord<String, TheEvent>("topic", "123", event))
then: "Consumer reads event eventually"
new PollingConditions(timeout: 10).eventually {
1 * eventListener.consumer(event)
}
}
}
问题是,第一次迭代检查条件后的
PollingCondition
将失败并出现以下错误,并将等待消费者!
Too few invocations for:
1 * eventListener.consumer(event) (0 invocations)
Unmatched invocations (ordered by similarity):
在这一步之后,我想检查系统功能并测试其他部分逻辑是否已正确执行,但是,首先我需要等待接收事件。
您无法在
PollingConditions
块中执行模拟断言。您需要使用 CountDownLatch
或类似的东西。请参阅https://stackoverflow.com/a/52935749/2145769
def "produce and consume event successfully"() {
given: "An event"
var event = MockEventFactory.createEvent()
var latch = new CountDownLatch(1)
when: "A producer sends the event"
this.theEventProducer.send(new ProducerRecord<String, TheEvent>("topic", "123", event))
and:
latch.await(10, TimeUnit.SECONDS) // must be in when-block, use timeout variant
then: "Consumer reads event eventually"
1 * eventListener.consumer(event) >> {
latch.countDown()
}
}