flink-streaming 相关问题

Apache Flink是一个用于可扩展批处理和流数据处理的开源平台。 Flink在一个系统中支持批量和流分析。分析程序可以用Java和Scala中简洁优雅的API编写。

使用连接器在 kubernetes 上部署 pyflink(kafka/kinesis)

我正在尝试找到一种使用 k8s 运算符在 k8s 上部署 pyflink 的方法。我已经能够使用 k8s Operator 上传作业,但我找不到如何向其添加连接器(例如 kafka-

回答 1 投票 0

org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema 无法转换为 ObjectNode

当我使用以下代码时: KafkaSource源= KafkaSource.builder() .setProperties(kafkaProps) .setProperty("ssl.truststore.

回答 2 投票 0

获取java.lang.NoClassDefFoundError:org/apache/flink/shaded/guava30/com/google/common/collect/ImmutableList

我使用的是 flink 1.17.1 和 java 版本 11 。 我遇到以下错误 引起:java.lang.ClassNotFoundException:org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList 需要解决方案...

回答 1 投票 0

由于 state.checkpoints.num-retained 配置导致触发 Flink 检查点(S3 后端)延迟

以下是我的 Flink 检查点配置,我们有 S3 作为后端。我们正在 EMR 集群中运行这个 flink 作业(版本:1.17.0) 检查点间隔:70000 检查点之间的最小暂停时间:15000 最大-

回答 1 投票 0

datastream的结果可以输出到List或者Map吗?

我希望通过一个流读取数据库表中的ID字段,将该字段存储到列表中,然后根据这个ID创建一个新的流来过滤对应的数据并同步到...

回答 1 投票 0

如何从代码中的保存点重新启动 flink

我有一个java类,正在向flink集群提交sql文件。 我有 StreamExecutionEnvironment StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

回答 4 投票 0

一个操作员如何将事件流传递给另一个操作员?

如果在 GoLang 中实现,操作员一可以通过 GoLang 通道在管道中将事件数据传递给操作员二。这有助于避免在运行时失败时出现关注点分离。 flink 操作吗...

回答 1 投票 0

为什么 Flink 没有均匀地分配我的工作以及如何解决这个问题?

我有一个从具有 5 个分区的数据源读取数据的 Flink 作业。我在配置文件中将每个任务管理器的并行度设置为 100。 在我的输入操作中,它只使用了5/100任务人...

回答 1 投票 0

Apache Flink 是否删除了 StreamOperatorTestHarness 类,或者它们是否转移到了不同的工件?

我正在使用intellij、maven 3和flink 1.15.1来编写有状态的流作业。我正在尝试为我的自定义 KeyedProcessFunction 编写单元测试,并尝试按照此处的文档进行操作...

回答 2 投票 0

如何刷新Apache flink中的redis缓存?

我有一个需求,需要从flink中的redis缓存中读取数据,但是按照需求,缓存数据平均每两个小时刷新一次。我正在查看文档并且

回答 1 投票 0

Flink 重复数据删除 - out.collect() 如何处理无界流?

以下是代码: 公共类验证重复{ 公共静态无效主(字符串[] args)抛出异常{ StreamExecutionEnvironment env = StreamExecutionEnvironment。

回答 1 投票 0

在 Flink 1.17 中没有看到 Kinesis 连接器包

我们在 Flink 作业中使用了 flink(1.16) 包的 Flink kinesis 连接器库。现在我们计划迁移到 Flink 1.17,但我看到 flink-kinesis-connector 包已从 Fli 中删除...

回答 1 投票 0

Flink flatMap() - NullPointerException

以下是代码: 公共类验证重复{ 公共静态无效主(字符串[] args)抛出异常{ StreamExecutionEnvironment env = StreamExecutionEnvironment。

回答 2 投票 0

为什么 Flink ValueState.value() 有时会错误返回 null?

我在 Flink 应用程序中遇到错误,在 KeyedProcessFunction 内调用 myValueState.value() 有时会返回 null,尽管代码中的逻辑应该保证...

回答 1 投票 0

启用增量检查点后,state.checkpoints.num-retained 的值应该是多少

根据 Flink 文档,state.checkpoints.num-retained 默认值为 1,这意味着 Flink 只在内存中保留一个检查点。我们在工作中启用了增量检查点。我...

回答 1 投票 0

Flink - 有多少水印被注入到这个有界流中?

在下面的代码中: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SingleOutputStreamOperator> dataStream = env.

回答 1 投票 0

Flink - 为什么有界流打印顺序不正确?

在下面的代码中: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource> dataStream = env.fromElement...

回答 1 投票 0

Flink - 何时不注入水印?用于开窗

我了解到,任何流数据的窗口化都使用水印作为边界 在下面的答案代码中,生成的流中没有注入水印(fromElements()) 案例类记录...

回答 1 投票 0

使用 Flink SQL 时并行性是如何工作的?

我知道,在 Flink Datastream 世界中,并行性意味着每个槽将获得事件的子集 [1]。 Flink 程序由多个任务组成(转换/运算符、 数据源,以及

回答 1 投票 0

如何将Table转换为包含数组类型的DataStream(Flink)?

我有关于 Flink (1.13+) 的 table-api 的问题。我有一个包含多个字段的 POJO,其中之一是: 列表 my_list; 我使用以下声明创建我的表...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.