Flink 中的 RichSinkFunction 单元测试

问题描述 投票:0回答:1

我有 RichSinkFunction 和自定义指标,它会增加在 invoke() 中输入的每个数据。 问题是我无法模拟 Counter.customMetric,因为它们是在 open() 处初始化的。

我如何对该计数器进行单元测试?

public void open(Configuration parameters) throws Exception { 
  
  this.customMetric = getRuntimeContext().getMetricGroup().counter("myCounter");

}

public invoke(MyData myData, Context context) throws Exception {

    this.customMetric().inc();
    LOG.info(this.customMetric().getCount());
}

我正在使用 Junit5 和 Mockito。模拟它仍然为空,我只能在传递给 CustomRichSinkFunction 类的构造函数时模拟对象。

mocking mockito apache-flink
1个回答
0
投票

我通过以下几种方法来测试指标:

1. Flink MiniCluster w 自定义测试 MetricReporterFactory (最难但方法正确)

对于测试,您将需要

org.apache.flink.metrics.reporter.MetricReporterFactory
的自定义实现:

public class MemorizingTestMetricReporterFactory implements MetricReporterFactory {

    @Override
    public MetricReporter createMetricReporter(Properties properties) {
        return new MemorizingTestMetricReporter();
    }
}

该工厂返回实现

org.apache.flink.metrics.reporter.MetricReporter
:

的新测试类
public class MemorizingTestMetricReporter implements MetricReporter {
    private static final Map<String, Metric> metricsByName = new ConcurrentHashMap<>();

    public static Map<String, Metric> getRegisteredMetrics() {
        return new HashMap<>(metricsByName);
    }

    @Override
    public void open(MetricConfig config) {
        // no op
    }

    @Override
    public void close() {
        // no op
    }

    @Override
    public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
        metricsByName.put(metricName, metric);
    }

    @Override
    public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
        metricsByName.remove(metricName);
    }
}

在此类中,我们存储将记录在静态集合中的所有指标。

为了使该工厂在测试中可见,您需要将一个文件添加到路径

test/resources/META-INF/services/
,名称为
org.apache.flink.metrics.reporter.MetricReporterFactory
,内容为
<your_package>.MemorizingTestMetricReporterFactory

下一步是通过提升和设置迷你集群进行测试本身:

@Test
@SneakyThrows
void testByFlinkMiniCluster() {
    // given
    // Todo: Create Flink miniCluster by Junit Extension
    // configure minicluster
    final var miniClusterConfig = new Configuration();
    miniClusterConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + MetricOptions.REPORTER_FACTORY_CLASS.key(), MemorizingTestMetricReporterFactory.class.getName());
    final var miniCluster = new MiniClusterWithClientResource(
            new MiniClusterResourceConfiguration.Builder()
                    .setConfiguration(miniClusterConfig)
                    .build());
    // start minicluster
    miniCluster.before();

    // when
    final var env = StreamExecutionEnvironment.getExecutionEnvironment();
    final var sourceData = List.of(MyData.builder().build());
    // your need some TestUnboundedSource
    env.fromCollection(sourceData)
            .returns(MyData.class)
            .addSink(new MyRichSinkFunction());
    final var jobClient = env.executeAsync();

    // then
    await().atMost(ofSeconds(2))
            .until(() -> {
                // check your counter by static map in MemorizingTestMetricReporter.class
                final var myCounter = MemorizingTestMetricReporter.getRegisteredMetrics().get("myCounter");
                return myCounter != null && ((Counter) myCounter).getCount() == sourceData.size();
            });
    jobClient.cancel().get(2, TimeUnit.SECONDS);
    miniCluster.after();
}

2. Flink TestHarness(简单但需要使用 Flink 测试工具抽象)

TestHarness 文档 参考

@Test
@SneakyThrows
void testByTestHarness() {
    // given
    // metric group for operators
    final var interceptingOperatorMetricGroup = new InterceptingOperatorMetricGroup();
    // metric group for task
    final var interceptingTaskMetricGroup = new InterceptingTaskMetricGroup() {
        @Override
        public InternalOperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) {
            return interceptingOperatorMetricGroup;
        }
    };
    final var mockEnvironment = new MockEnvironmentBuilder().setMetricGroup(interceptingTaskMetricGroup).build();
    try (final var testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink<>(new MyRichSinkFunction()), mockEnvironment)) {
        testHarness.setup();
        testHarness.open();

        // when
        final var sourceData = List.of(new StreamRecord<>(MyData.builder().build()));
        testHarness.processElements(sourceData);

        // then
        final var myCounter = interceptingOperatorMetricGroup.get("myCounter");
        assertNotNull(myCounter, "Counter is null");
        assertEquals(1, ((Counter) myCounter).getCount());
    }
}

3.模拟(最糟糕的方式)

你可以使用Mockito,但我们被迫模拟了解组件的内部实现,这与测试的本质相矛盾

@Test
@SneakyThrows
void testByMock() {
    // given
    final var myRichSinkFunction = new MyRichSinkFunction();
    final var runtimeContext = mock(RuntimeContext.class);
    myRichSinkFunction.setRuntimeContext(runtimeContext);
    final var metricGroup = mock(OperatorMetricGroup.class);
    doReturn(metricGroup).when(runtimeContext).getMetricGroup();
    when(metricGroup.addGroup(any())).thenReturn(metricGroup);
    final var counter = new Counter() {
        private final AtomicLong count = new AtomicLong();
        @Override
        public void inc() {
            count.incrementAndGet();
        }

        @Override
        public void inc(long n) {
            count.addAndGet(n);
        }

        @Override
        public void dec() {
            count.decrementAndGet();
        }

        @Override
        public void dec(long n) {
            count.getAndUpdate(l -> l - n);
        }

        @Override
        public long getCount() {
            return count.get();
        }
    };
    when(metricGroup.counter(any())).thenReturn(counter);

    // when
    myRichSinkFunction.open(mock(Configuration.class));
    myRichSinkFunction.invoke(new MyData(), mock(SinkFunction.Context.class));

    // then
    final var counterCaptor = ArgumentCaptor.forClass(String.class);
    verify(metricGroup, times(1)).counter(counterCaptor.capture());
    assertEquals("myCounter", counterCaptor.getValue());

    assertEquals(1, counter.getCount());
}
© www.soinside.com 2019 - 2024. All rights reserved.