TestProcessingTimeService 和 InternalTimeServiceManagerImpl 有什么区别?

问题描述 投票:0回答:1

使用

ProcessFunctionTestHarnesses

对 Apache Flink 进行单元测试时

KeyedOneInputStreamOperatorTestHarness<String, Event, Alert> testHarness = ProcessFunctionTestHarnesses.forKeyedProcessFunction

testHarness
可以访问2个类来获取键控过程函数中活动计时器的数量。

  1. int i = testHarness.getProcessingTimeService().getNumActiveTimers()
  2. int k = testHarness.numProcessingTimeTimers()

我查看了TestProcessingTimeServiceInternalTimeServiceManager的java文档,但描述并没有说太多。

两者黑白有什么区别,应该使用哪一个?

在我的 KeyedProcessFunction 中,我设置然后删除一个处理计时器。在我的 UT 结束时,我预计将有 0 个活动处理计时器。从上面的代码片段来看,

k
通过了等于
0
的断言,但是
i
失败了,它的实际值为
1

java unit-testing apache-flink
1个回答
0
投票

我认为这里的关键区别在于支持每个调用的实际服务,其中一个使用实际的测试特定实例,另一个使用内部计时器服务。两者都可能完成相同的事情,但实现可能会有所不同。

testHarness.getProcessingTimeService().getNumActiveTimers()

这使用您之前提到的

TestProcessingTimeService
的实例,并简单地迭代队列以获取队列中未“完成”的条目数。我们可以看到根据来源

@VisibleForTesting
public int numProcessingTimeTimers() {
    if (timeServiceManager != null) {
        return timeServiceManager.numProcessingTimeTimers();
    } else {
        throw new UnsupportedOperationException();
    }
}

testHarness.numProcessingTimeTimers()

这使用

InternalTimeServiceManager<K>
的实际实例,而不是特定于测试的计时器对象根据源

@VisibleForTesting
public int numProcessingTimeTimers() {
    int count = 0;
    for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) {
        count += timerService.numProcessingTimeTimers();
    }
    return count;
}

根据我的经验,在编写围绕时间/计时器的测试时,可能有必要在删除/删除计时器后确保其正确完成,并且时间可能需要在删除后提前以确保传播预期的更改到服务(例如,完成计时器后提前时间,以确保底层服务识别出它不再“活动”)。 这也完全有可能是

TestProcessingTimeService

的实现中的一个错误,导致它在断言之前无法立即识别出计时器的删除。

    

© www.soinside.com 2019 - 2024. All rights reserved.