我一直在尝试使用窗口类型 TumblingProcessingTimeWindows 来测试 apache flink 中的 windowAll 操作,但是当我尝试从接收器检索结果时,它们始终为 0。使用 TumblingProcessingTimeWindows 是强制性的,并且是规范的一部分,因此我不能使用由事件。具体来说,我有以下流程:
public void executeStream() {
var stream = env.addSource(convertToSourceFunction)
.returns(new TypeHint<SomeObject>() {})
.windowAll(some window of type TumblingProcessingTimeWindows)
.process(a class of type ProcessAllWindowFunction<SomeObject, ListOfObjects, TimeWindow> );
.addSink(myKafkaSink);
env.execute();
}
当尝试测试上面的代码时,我创建了以下资源:
private FromElementsFunction<SomeObject> convertToSourceFunction(SomeObject element) {
def hint = new TypeHint<SomeObject>(){}
return new FromElementsFunction<>(hint.typeInfo.createSerializer(env.getConfig()), element)
}
class MyKafkaSink implements SinkFunction<ListOfObjects> {
static def values = []
@Override
void invoke(ListOfObjects value, Context context) throws Exception {
values << value
}
}
当尝试比较和验证接收器中是否只有一个元素时,我在以下检查中得到错误:
MyKafkaSink.values.size == 1 //real value is 0
当然,如果我不使用窗口函数,一切都会按预期工作。测试这样的案例的正确方法是什么?我正在使用 Spock 来测试 Java 代码。
您的测试工作几乎肯定会在窗口有机会触发之前完成。测试依赖于处理时间的应用程序可能具有挑战性,因为 Flink 在退出之前不会等待处理时间计时器触发。
您需要找到某种方法来防止作业过早退出,和/或人为地提前时钟,以便窗口有机会触发。
值得一提的是,Flink 提供的测试工具有一个
setProcessingTime
方法可以处理此问题,但它们仅支持单个算子的单元测试,而不支持端到端集成测试。