CompletableFuture.get() 在 JUnit 中抛出 java.util.concurrent.TimeoutException

问题描述 投票:0回答:0

我正在努力为我的 kafka 生产者编写一个 JUnit 测试

KafkaTemplate
CompletableFuture
。不调用
get()
的方法的类似测试工作正常。

这是我的kafka制作人课程(

MyProducer
):

@Service
public class MyProducer {

    private final Logger logger = LoggerFactory.getLogger(MyProducer.class);
    private KafkaTemplate<String, MyObject> kafkaTemplate;
    public MyProducer() {}

    @Autowired(required = false)
    public void setKafkaTemplate(KafkaTemplate<String, MyObject> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendEvent(MyObject message) {
        ListenableFuture<SendResult<String, MyObject>> future = kafkaTemplate.send("my-topic", message);
        future.addCallback(
                result -> logger.info("Everything is fine!"),
                exception -> logger.info("Ups!")
        );
    }

    public SendResult<String, MyObject> sendSpecialEvent(MyObject message) throws ExecutionException, InterruptedException,
            TimeoutException {
        return kafkaTemplate.send("my-topic", message).get(100, TimeUnit.MILLISECONDS);
    }
}

和测试类:

@ExtendWith(MockitoExtension.class)
public class MyProducerTest {

    private final String topic = "dummyString";
    @Mock
    private KafkaTemplate<String, MyObject> kafkaTemplate;
    @Mock
    private Logger logger;
    private CompletableFuture<SendResult<String, MyObject>> completableFuture;
    private MyObject myObject;
    private MyProducer myProducer;

    @BeforeEach
    void setUp() {
        myProducer = new MyProducer();
        ReflectionTestUtils.setField(myProducer, "logger", logger);
        myProducer.setKafkaTemplate(kafkaTemplate);

        myObject = mock(MyObject.class);
        completableFuture = new CompletableFuture<>();
        var futureAdapter = new CompletableToListenableFutureAdapter<>(completableFuture);
        when(kafkaTemplate.send(any(), any())).thenReturn(futureAdapter);
    }

    @Test
    void testSuccessCallback() {
        var successMessage = "OK Message";
        var offset = 1L;

        myProducer.sendEvent(myObject);
        completableFuture.complete(new SendResult<>(
                new ProducerRecord<>(topic, myObject),
                new RecordMetadata(new TopicPartition(topic, 1), offset, 0, 0, 0, 0)
        ));

        assertTrue(completableFuture.isDone());
        verify(logger).info(successMessage);
    }

    @Test
    void testFailureCallback() {
        var failureMessage = "KO Message";
        var exceptionMessage = "fakeException";

        myProducer.sendEvent(myObject);
        completableFuture.completeExceptionally(new RuntimeException(exceptionMessage));

        assertTrue(completableFuture.isCompletedExceptionally());
        verify(logger).info(failureMessage);
    }

    @Test
    void testSpecialSuccessCallback() throws ExecutionException, InterruptedException, TimeoutException {
        var offset = 1L;

        SendResult<String, MyObject> result = myProducer.sendSpecialEvent(myObject);
        completableFuture.complete(new SendResult<>(
                new ProducerRecord<>(topic, myObject),
                new RecordMetadata(new TopicPartition(topic, 1), offset, 0, 0, 0, 0)
        ));

        assertTrue(completableFuture.isDone());
        assertTrue(Objects.nonNull(result));
        assertTrue(Objects.equals(result.getProducerRecord().value(), myObject));
    }

    @Test
    void testSpecialFailureCallback() {
        // TODO: implement negative scenario
    }
}

前两个测试通过。第三个是抛出异常

java.util.concurrent.TimeoutException at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021) at org.springframework.util.concurrent.CompletableToListenableFutureAdapter.get(CompletableToListenableFutureAdapter.java:104)
.

现在我不知道如何处理最后一个测试用例......

java spring-boot junit completable-future
© www.soinside.com 2019 - 2024. All rights reserved.