我有 RichSinkFunction 和自定义指标,它会增加在 invoke() 中输入的每个数据。 问题是我无法模拟 Counter.customMetric,因为它们是在 open() 处初始化的。
我如何对该计数器进行单元测试?
public void open(Configuration parameters) throws Exception {
this.customMetric = getRuntimeContext().getMetricGroup().counter("myCounter");
}
public invoke(MyData myData, Context context) throws Exception {
this.customMetric().inc();
LOG.info(this.customMetric().getCount());
}
我正在使用 Junit5 和 Mockito。模拟它仍然为空,我只能在传递给 CustomRichSinkFunction 类的构造函数时模拟对象。
我通过以下几种方法来测试指标:
对于测试,您将需要
org.apache.flink.metrics.reporter.MetricReporterFactory
的自定义实现:
public class MemorizingTestMetricReporterFactory implements MetricReporterFactory {
@Override
public MetricReporter createMetricReporter(Properties properties) {
return new MemorizingTestMetricReporter();
}
}
该工厂返回实现
org.apache.flink.metrics.reporter.MetricReporter
: 的新测试类
public class MemorizingTestMetricReporter implements MetricReporter {
private static final Map<String, Metric> metricsByName = new ConcurrentHashMap<>();
public static Map<String, Metric> getRegisteredMetrics() {
return new HashMap<>(metricsByName);
}
@Override
public void open(MetricConfig config) {
// no op
}
@Override
public void close() {
// no op
}
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
metricsByName.put(metricName, metric);
}
@Override
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
metricsByName.remove(metricName);
}
}
在此类中,我们存储将记录在静态集合中的所有指标。
为了使该工厂在测试中可见,您需要将一个文件添加到路径
test/resources/META-INF/services/
,名称为 org.apache.flink.metrics.reporter.MetricReporterFactory
,内容为 <your_package>.MemorizingTestMetricReporterFactory
下一步是通过提升和设置迷你集群进行测试本身:
@Test
@SneakyThrows
void testByFlinkMiniCluster() {
// given
// Todo: Create Flink miniCluster by Junit Extension
// configure minicluster
final var miniClusterConfig = new Configuration();
miniClusterConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + MetricOptions.REPORTER_FACTORY_CLASS.key(), MemorizingTestMetricReporterFactory.class.getName());
final var miniCluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(miniClusterConfig)
.build());
// start minicluster
miniCluster.before();
// when
final var env = StreamExecutionEnvironment.getExecutionEnvironment();
final var sourceData = List.of(MyData.builder().build());
// your need some TestUnboundedSource
env.fromCollection(sourceData)
.returns(MyData.class)
.addSink(new MyRichSinkFunction());
final var jobClient = env.executeAsync();
// then
await().atMost(ofSeconds(2))
.until(() -> {
// check your counter by static map in MemorizingTestMetricReporter.class
final var myCounter = MemorizingTestMetricReporter.getRegisteredMetrics().get("myCounter");
return myCounter != null && ((Counter) myCounter).getCount() == sourceData.size();
});
jobClient.cancel().get(2, TimeUnit.SECONDS);
miniCluster.after();
}
TestHarness 文档 参考
@Test
@SneakyThrows
void testByTestHarness() {
// given
// metric group for operators
final var interceptingOperatorMetricGroup = new InterceptingOperatorMetricGroup();
// metric group for task
final var interceptingTaskMetricGroup = new InterceptingTaskMetricGroup() {
@Override
public InternalOperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) {
return interceptingOperatorMetricGroup;
}
};
final var mockEnvironment = new MockEnvironmentBuilder().setMetricGroup(interceptingTaskMetricGroup).build();
try (final var testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink<>(new MyRichSinkFunction()), mockEnvironment)) {
testHarness.setup();
testHarness.open();
// when
final var sourceData = List.of(new StreamRecord<>(MyData.builder().build()));
testHarness.processElements(sourceData);
// then
final var myCounter = interceptingOperatorMetricGroup.get("myCounter");
assertNotNull(myCounter, "Counter is null");
assertEquals(1, ((Counter) myCounter).getCount());
}
}
你可以使用Mockito,但我们被迫模拟了解组件的内部实现,这与测试的本质相矛盾
@Test
@SneakyThrows
void testByMock() {
// given
final var myRichSinkFunction = new MyRichSinkFunction();
final var runtimeContext = mock(RuntimeContext.class);
myRichSinkFunction.setRuntimeContext(runtimeContext);
final var metricGroup = mock(OperatorMetricGroup.class);
doReturn(metricGroup).when(runtimeContext).getMetricGroup();
when(metricGroup.addGroup(any())).thenReturn(metricGroup);
final var counter = new Counter() {
private final AtomicLong count = new AtomicLong();
@Override
public void inc() {
count.incrementAndGet();
}
@Override
public void inc(long n) {
count.addAndGet(n);
}
@Override
public void dec() {
count.decrementAndGet();
}
@Override
public void dec(long n) {
count.getAndUpdate(l -> l - n);
}
@Override
public long getCount() {
return count.get();
}
};
when(metricGroup.counter(any())).thenReturn(counter);
// when
myRichSinkFunction.open(mock(Configuration.class));
myRichSinkFunction.invoke(new MyData(), mock(SinkFunction.Context.class));
// then
final var counterCaptor = ArgumentCaptor.forClass(String.class);
verify(metricGroup, times(1)).counter(counterCaptor.capture());
assertEquals("myCounter", counterCaptor.getValue());
assertEquals(1, counter.getCount());
}