使用 windowAll 测试 flink 流

问题描述 投票:0回答:1

我一直在尝试使用窗口类型 TumblingProcessingTimeWindows 来测试 apache flink 中的 windowAll 操作,但是当我尝试从接收器检索结果时,它们始终为 0。使用 TumblingProcessingTimeWindows 是强制性的,并且是规范的一部分,因此我不能使用由事件。具体来说,我有以下流程:

public void executeStream() {

  var stream = env.addSource(convertToSourceFunction)
              .returns(new TypeHint<SomeObject>() {})
              .windowAll(some window of type TumblingProcessingTimeWindows)
              .process(a class of type ProcessAllWindowFunction<SomeObject, ListOfObjects, TimeWindow> );
              .addSink(myKafkaSink);
      env.execute();
}

当尝试测试上面的代码时,我创建了以下资源:

    private FromElementsFunction<SomeObject> convertToSourceFunction(SomeObject element) {
        def hint = new TypeHint<SomeObject>(){}
        return new FromElementsFunction<>(hint.typeInfo.createSerializer(env.getConfig()), element)
    }
class MyKafkaSink implements SinkFunction<ListOfObjects> {
    static def values = []

    @Override
    void invoke(ListOfObjects value, Context context) throws Exception {
        values << value
    }
}

当尝试比较和验证接收器中是否只有一个元素时,我在以下检查中得到错误:

MyKafkaSink.values.size == 1 //real value is 0

当然,如果我不使用窗口函数,一切都会按预期工作。测试这样的案例的正确方法是什么?我正在使用 Spock 来测试 Java 代码。

apache-flink
1个回答
0
投票

您的测试工作几乎肯定会在窗口有机会触发之前完成。测试依赖于处理时间的应用程序可能具有挑战性,因为 Flink 在退出之前不会等待处理时间计时器触发。

您需要找到某种方法来防止作业过早退出,和/或人为地提前时钟,以便窗口有机会触发。

值得一提的是,Flink 提供的测试工具有一个

setProcessingTime
方法可以处理此问题,但它们仅支持单个算子的单元测试,而不支持端到端集成测试。

© www.soinside.com 2019 - 2024. All rights reserved.