我正在努力为我的 kafka 生产者编写一个 JUnit 测试
KafkaTemplate
和 CompletableFuture
。不调用 get()
的方法的类似测试工作正常。
这是我的kafka制作人课程(
MyProducer
):
@Service
public class MyProducer {
private final Logger logger = LoggerFactory.getLogger(MyProducer.class);
private KafkaTemplate<String, MyObject> kafkaTemplate;
public MyProducer() {}
@Autowired(required = false)
public void setKafkaTemplate(KafkaTemplate<String, MyObject> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendEvent(MyObject message) {
ListenableFuture<SendResult<String, MyObject>> future = kafkaTemplate.send("my-topic", message);
future.addCallback(
result -> logger.info("Everything is fine!"),
exception -> logger.info("Ups!")
);
}
public SendResult<String, MyObject> sendSpecialEvent(MyObject message) throws ExecutionException, InterruptedException,
TimeoutException {
return kafkaTemplate.send("my-topic", message).get(100, TimeUnit.MILLISECONDS);
}
}
和测试类:
@ExtendWith(MockitoExtension.class)
public class MyProducerTest {
private final String topic = "dummyString";
@Mock
private KafkaTemplate<String, MyObject> kafkaTemplate;
@Mock
private Logger logger;
private CompletableFuture<SendResult<String, MyObject>> completableFuture;
private MyObject myObject;
private MyProducer myProducer;
@BeforeEach
void setUp() {
myProducer = new MyProducer();
ReflectionTestUtils.setField(myProducer, "logger", logger);
myProducer.setKafkaTemplate(kafkaTemplate);
myObject = mock(MyObject.class);
completableFuture = new CompletableFuture<>();
var futureAdapter = new CompletableToListenableFutureAdapter<>(completableFuture);
when(kafkaTemplate.send(any(), any())).thenReturn(futureAdapter);
}
@Test
void testSuccessCallback() {
var successMessage = "OK Message";
var offset = 1L;
myProducer.sendEvent(myObject);
completableFuture.complete(new SendResult<>(
new ProducerRecord<>(topic, myObject),
new RecordMetadata(new TopicPartition(topic, 1), offset, 0, 0, 0, 0)
));
assertTrue(completableFuture.isDone());
verify(logger).info(successMessage);
}
@Test
void testFailureCallback() {
var failureMessage = "KO Message";
var exceptionMessage = "fakeException";
myProducer.sendEvent(myObject);
completableFuture.completeExceptionally(new RuntimeException(exceptionMessage));
assertTrue(completableFuture.isCompletedExceptionally());
verify(logger).info(failureMessage);
}
@Test
void testSpecialSuccessCallback() throws ExecutionException, InterruptedException, TimeoutException {
var offset = 1L;
SendResult<String, MyObject> result = myProducer.sendSpecialEvent(myObject);
completableFuture.complete(new SendResult<>(
new ProducerRecord<>(topic, myObject),
new RecordMetadata(new TopicPartition(topic, 1), offset, 0, 0, 0, 0)
));
assertTrue(completableFuture.isDone());
assertTrue(Objects.nonNull(result));
assertTrue(Objects.equals(result.getProducerRecord().value(), myObject));
}
@Test
void testSpecialFailureCallback() {
// TODO: implement negative scenario
}
}
前两个测试通过。第三个是抛出异常
java.util.concurrent.TimeoutException at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021) at org.springframework.util.concurrent.CompletableToListenableFutureAdapter.get(CompletableToListenableFutureAdapter.java:104)
.
现在我不知道如何处理最后一个测试用例......