我正在使用 Flink 1.15 DataStream api 来执行 ETL 工作。我想设置我的作业集BATCH执行模式,所以我使用官方网站中提供的代码。
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
但是,我遇到了以下错误:
java.lang.UnsupportedOperationException at org.apache.flink.runtime.io.network.partition.ResultPartition.getAllDataProcessedFuture(ResultPartition.java:233)
我的整个代码逻辑
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/to/file");
`env.setRuntimeMode(RuntimeExecutionMode.BATCH);`
DataStream<OutputType> result = text
.map(/* map logic here */ )
.keyBy(/* keyby logic here */)
.reduce(/* reduce logic here */)
result.writeAsText("filePath")
任何人都可以提供一些有关我为何收到此错误以及如何解决该错误的见解吗? 谢谢!
我的项目背景(如果你想了解更多为什么我要使用批处理模式):
我目前正在从事一项工作,从 S3 读取数据,使用键对数据执行一些转换和缩减。 在此过程中,我遇到一个问题,我的应用程序似乎存储每个中间缩减结果,而不仅仅是每个键的最终缩减值。我知道这可能是由于流式执行的性质造成的,流式执行在事件到达时不断处理事件。我的情况与这篇文章非常相似:https://stackoverflow.com/questions/58828218/how-to-avoid-单词计数中的重复键元组-w-apache-flink
所以我想改成批处理模式看看是否有效。
我尝试的:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/to/file");
`env.setRuntimeMode(RuntimeExecutionMode.BATCH);`
text.writeAsText("filePath")
我可以通过禁用检查点来解决这个问题。这是特定于在 aws Managed Flink 中运行它的。在 terraform 中我添加了这个
flink_application_configuration {
checkpoint_configuration {
configuration_type = "CUSTOM"
checkpointing_enabled = false // disable checkpoints since it is a batch job
}
aws_kinesisanalyticsv2_application 资源内部。