我有 KafkaRecordInterceptor 类,它实现了 RecordInterceptor 接口
@Component
public class KafkaRecordInterceptor implements RecordInterceptor<String, Message<Object>> {
public static final ThreadLocal<MessageContext> messageContextThreadLocal = new ThreadLocal<>();
private final List<MessageContextListener> messageContextListener;
private final MetricsUtils metricsUtils;
private final SystemClock systemClock;
...
然后我重写拦截方法如下
@Override
public ConsumerRecord intercept(@NonNull ConsumerRecord consumerRecord, @NonNull Consumer consumer) {
MessageContext messageContext = new MessageContext();
messageContext.setStartClock(systemClock.now());
messageContextThreadLocal.set(messageContext);
Optional<MessagePriority> messagePriorityOptional =
Objects.requireNonNull(metricsUtils.getMessagePriority(consumerRecord));
messageContext.setCorrelationId(KafkaUtils.extractCorrelationId(consumerRecord));
messageContext
.setOperationType(((GenericMessage<?>) consumerRecord.value()).getPayload().getClass().getSimpleName());
messageContext.setPriority(messagePriorityOptional.map(MessagePriority::name).orElse(""));
messageContextListener.forEach(listener -> listener.onStart(messageContext));
return Objects.requireNonNull(consumerRecord);
}
我也覆盖
@Override
public void afterRecord(ConsumerRecord record, Consumer consumer) {
MessageContext messageContext = messageContextThreadLocal.get();
messageContextListener.forEach(listener -> listener.onClear(messageContext));
messageContextThreadLocal.remove();
}
我的问题是,对拦截器方法和 afterRecord 进行单元测试的最佳方法是什么?
看起来你可以在单元测试中
new KafkaRecordInterceptor()
。
为其方法调用提供 ConsumerRecord
和 Consumer
的模拟,然后验证最终预期的交互。