使用 Kafka 进行模拟单元测试 - 回调模拟生产者

问题描述 投票:0回答:1

我想围绕调用 Kafka 设置一些测试。我的 Kafka 调用有以下回调设置,请参见下文:

@Async // allows function to be async. the config also has to be set
@Override
public void publishEventToTopic() {
        ListenableFuture<SendResult<String, KafkaRequest>> future = kafkaTemplate.send(topic, request);

        future.addCallback(new ListenableFutureCallback<SendResult<String, KafkaRequest>>() {
            @Override
            public void onSuccess(SendResult<String, KafkaRequest> result) {
                log.info("Success");
            }

            @Override
            public void onFailure(Throwable ex) {
                log.info("Failure");
            }
        });
}

我想做的是测试这两种场景(onSuccess 和 onFailure)。我尝试遵循这个SO问题,但我遇到了一个错误,其中显示“主题不能为空”。这是我尝试过的:

@Test
public void test2() {
    String key = "test1";
    String topic = "sender.t";
    long offset = 1;
    int partition = 0;

    SendResult<String, Object> sendResult = mock(SendResult.class);
    ListenableFuture<SendResult<String, KafkaRequest>> responseFuture = mock(ListenableFuture.class);
    RecordMetadata recordMetadata = new RecordMetadata(new TopicPartition(topic, partition), offset, 0L, 0L, 0L, 0, 0);
    ProducerRecord producerRecord = new ProducerRecord(topic, partition, key, "");

    given(sendResult.getRecordMetadata()).willReturn(recordMetadata);
    given(sendResult.getProducerRecord()).willReturn(producerRecord);

    when(template.send(any(), any())).thenReturn(responseFuture);

    doAnswer(invocationOnMock -> {
        ListenableFutureCallback listenableFutureCallback = invocationOnMock.getArgument(0);
        listenableFutureCallback.onFailure(throwable);
        return null;
    })
    .when(responseFuture).addCallback(any()); //this is the line that is throwing an error when trying to debug.


    publisher.publishEventToTopic();
}

我不太确定这意味着什么,所以几个小时后,我决定尝试不同的路径 - 使用 MockProducer 类。它似乎有效,但并不一致:

@Test
public void test3() throws InterruptedException {
    ListenableFuture<SendResult<String, KafkaRequest>> responseErrorFuture = mock(ListenableFuture.class);
    MockProducer mockProducer = new MockProducer(Cluster.empty(), false, null, null, null);
    mockProducer.clear();
    mockProducer.errorNext(new RuntimeException("Offset commit failed on partition my-topic-2-9 at offset 0")); //this doesn't work anymore.

    publisher.publishEventToTopic();
}

然后我尝试将其添加到其中,但它抱怨我没有嘲笑正确的事情:

        doAnswer(invocation -> {
        ListenableFutureCallback listenableFutureCallback = invocation.getArgument(0);
        mockProducer.errorNext(new RuntimeException("Offset commit failed on partition my-topic-2-9 at offset 0"));
        return null;
    }).when(mockProducer).send(any(), any());

我的问题是,当我发送 Kafka 请求时,是否有人知道我做错了什么而没有测试回调的 onFailure 方法?

谢谢!

apache-kafka callback future spring-kafka spring-kafka-test
1个回答
0
投票

您能分享一下解决方案吗?

© www.soinside.com 2019 - 2024. All rights reserved.