如何使用 Mockito 对 Kafka 的 RecordIntercept 接口的拦截方法进行单元测试

问题描述 投票:0回答:1

我有 KafkaRecordInterceptor 类,它实现了 RecordInterceptor 接口 如下

@Component
public class KafkaRecordInterceptor implements RecordInterceptor<String, Message<Object>> {
    public static final ThreadLocal<MessageContext> messageContextThreadLocal = new ThreadLocal<>();
    private final List<MessageContextListener> messageContextListener;
    private final MetricsUtils metricsUtils;
    private final SystemClock systemClock;
...

然后我重写拦截方法如下

    @Override
    public ConsumerRecord intercept(@NonNull ConsumerRecord consumerRecord, @NonNull Consumer consumer) {

        MessageContext messageContext = new MessageContext();
        messageContext.setStartClock(systemClock.now());
        messageContextThreadLocal.set(messageContext);

        Optional<MessagePriority> messagePriorityOptional =
            Objects.requireNonNull(metricsUtils.getMessagePriority(consumerRecord));

        messageContext.setCorrelationId(KafkaUtils.extractCorrelationId(consumerRecord));
        messageContext
            .setOperationType(((GenericMessage<?>) consumerRecord.value()).getPayload().getClass().getSimpleName());
        messageContext.setPriority(messagePriorityOptional.map(MessagePriority::name).orElse(""));

        messageContextListener.forEach(listener -> listener.onStart(messageContext));

        return Objects.requireNonNull(consumerRecord);
    }

我也覆盖

    @Override
    public void afterRecord(ConsumerRecord record, Consumer consumer) {
        MessageContext messageContext = messageContextThreadLocal.get();
        messageContextListener.forEach(listener -> listener.onClear(messageContext));
        messageContextThreadLocal.remove();
    }

我的问题是,对拦截器方法和 afterRecord 进行单元测试的最佳方法是什么?

apache-kafka mockito spring-kafka junit5 spring-kafka-test
1个回答
0
投票

看起来你可以在单元测试中

new KafkaRecordInterceptor()
。 为其方法调用提供
ConsumerRecord
Consumer
的模拟,然后验证最终预期的交互。

© www.soinside.com 2019 - 2024. All rights reserved.