Flink 1.15:为 DataStream API 设置 BATCH 执行模式时出错

问题描述 投票:0回答:1

我正在使用 Flink 1.15 DataStream api 来执行 ETL 工作。我想设置我的作业集BATCH执行模式,所以我使用官方网站中提供的代码。

env.setRuntimeMode(RuntimeExecutionMode.BATCH);
但是,我遇到了以下错误:

 java.lang.UnsupportedOperationException at org.apache.flink.runtime.io.network.partition.ResultPartition.getAllDataProcessedFuture(ResultPartition.java:233)

我的整个代码逻辑

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path/to/file");

`env.setRuntimeMode(RuntimeExecutionMode.BATCH);`

DataStream<OutputType> result = text
.map(/* map logic here */ )
.keyBy(/* keyby logic here */)
.reduce(/* reduce logic here */)

result.writeAsText("filePath")

任何人都可以提供一些有关我为何收到此错误以及如何解决该错误的见解吗? 谢谢!


我的项目背景(如果你想了解更多为什么我要使用批处理模式):

我目前正在从事一项工作,从 S3 读取数据,使用键对数据执行一些转换和缩减。 在此过程中,我遇到一个问题,我的应用程序似乎存储每个中间缩减结果,而不仅仅是每个键的最终缩减值。我知道这可能是由于流式执行的性质造成的,流式执行在事件到达时不断处理事件。我的情况与这篇文章非常相似:https://stackoverflow.com/questions/58828218/how-to-avoid-单词计数中的重复键元组-w-apache-flink

所以我想改成批处理模式看看是否有效。


我尝试的:

  1. 我删除了转换逻辑,但仍然出现与上面相同的错误:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path/to/file");

`env.setRuntimeMode(RuntimeExecutionMode.BATCH);`

text.writeAsText("filePath")
apache-flink flink-streaming flink-batch
1个回答
0
投票

我可以通过禁用检查点来解决这个问题。这是特定于在 aws Managed Flink 中运行它的。在 terraform 中我添加了这个

flink_application_configuration {
      checkpoint_configuration {
        configuration_type    = "CUSTOM"
        checkpointing_enabled = false // disable checkpoints since it is a batch job
      }

aws_kinesisanalyticsv2_application 资源内部。

© www.soinside.com 2019 - 2024. All rights reserved.