flink-streaming 相关问题

Apache Flink是一个用于可扩展批处理和流数据处理的开源平台。 Flink在一个系统中支持批量和流分析。分析程序可以用Java和Scala中简洁优雅的API编写。

如何从代码中的保存点重新启动 flink

我有一个java类,正在向flink集群提交sql文件。 我有 StreamExecutionEnvironment StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

回答 4 投票 0

一个操作员如何将事件流传递给另一个操作员?

如果在 GoLang 中实现,操作员一可以通过 GoLang 通道在管道中将事件数据传递给操作员二。这有助于避免在运行时失败时出现关注点分离。 flink 操作吗...

回答 1 投票 0

为什么 Flink 没有均匀地分配我的工作以及如何解决这个问题?

我有一个从具有 5 个分区的数据源读取数据的 Flink 作业。我在配置文件中将每个任务管理器的并行度设置为 100。 在我的输入操作中,它只使用了5/100任务人...

回答 1 投票 0

Apache Flink 是否删除了 StreamOperatorTestHarness 类,或者它们是否转移到了不同的工件?

我正在使用intellij、maven 3和flink 1.15.1来编写有状态的流作业。我正在尝试为我的自定义 KeyedProcessFunction 编写单元测试,并尝试按照此处的文档进行操作...

回答 2 投票 0

如何刷新Apache flink中的redis缓存?

我有一个需求,需要从flink中的redis缓存中读取数据,但是按照需求,缓存数据平均每两个小时刷新一次。我正在查看文档并且

回答 1 投票 0

Flink 重复数据删除 - out.collect() 如何处理无界流?

以下是代码: 公共类验证重复{ 公共静态无效主(字符串[] args)抛出异常{ StreamExecutionEnvironment env = StreamExecutionEnvironment。

回答 1 投票 0

在 Flink 1.17 中没有看到 Kinesis 连接器包

我们在 Flink 作业中使用了 flink(1.16) 包的 Flink kinesis 连接器库。现在我们计划迁移到 Flink 1.17,但我看到 flink-kinesis-connector 包已从 Fli 中删除...

回答 1 投票 0

Flink flatMap() - NullPointerException

以下是代码: 公共类验证重复{ 公共静态无效主(字符串[] args)抛出异常{ StreamExecutionEnvironment env = StreamExecutionEnvironment。

回答 2 投票 0

为什么 Flink ValueState.value() 有时会错误返回 null?

我在 Flink 应用程序中遇到错误,在 KeyedProcessFunction 内调用 myValueState.value() 有时会返回 null,尽管代码中的逻辑应该保证...

回答 1 投票 0

启用增量检查点后,state.checkpoints.num-retained 的值应该是多少

根据 Flink 文档,state.checkpoints.num-retained 默认值为 1,这意味着 Flink 只在内存中保留一个检查点。我们在工作中启用了增量检查点。我...

回答 1 投票 0

Flink - 有多少水印被注入到这个有界流中?

在下面的代码中: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SingleOutputStreamOperator> dataStream = env.

回答 1 投票 0

Flink - 为什么有界流打印顺序不正确?

在下面的代码中: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource> dataStream = env.fromElement...

回答 1 投票 0

Flink - 何时不注入水印?用于开窗

我了解到,任何流数据的窗口化都使用水印作为边界 在下面的答案代码中,生成的流中没有注入水印(fromElements()) 案例类记录...

回答 1 投票 0

使用 Flink SQL 时并行性是如何工作的?

我知道,在 Flink Datastream 世界中,并行性意味着每个槽将获得事件的子集 [1]。 Flink 程序由多个任务组成(转换/运算符、 数据源,以及

回答 1 投票 0

如何将Table转换为包含数组类型的DataStream(Flink)?

我有关于 Flink (1.13+) 的 table-api 的问题。我有一个包含多个字段的 POJO,其中之一是: 列表 my_list; 我使用以下声明创建我的表...

回答 1 投票 0

Apache Flink SQL Client 无法连接到 EKS 集群中的 JobManager 实例

我已经按照 https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#session-mode 在 Kubernetes 上部署了 Apache Flink 会话模式,并且有...

回答 1 投票 0

Flink 应用程序未接收和处理 Kinesis 连接器关闭时生成的事件

问题:Flink 应用程序无法接收和处理 Kinesis 连接器关闭时(由于重新启动)生成的事件 我们有以下 Flink 环境设置 环境

回答 3 投票 0

flink 是否总是为相同的键值返回相同的分区状态对象?

如果我们在 KeyedStream#process 的 KeyedProcessFunction 中填充一个状态对象,例如 新的 KeyedProcessFunction() { 私人地图状态 如果我们在 KeyedStream#process 的 KeyedProcessFunction 中填充状态对象,例如 new KeyedProcessFunction<String, Rule, Rule>() { private MapState<String, ArrayList<Rule>> rulesState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); rulesState = getRuntimeContext().getMapState(Descriptors.rulesPerCustomerDescriptor); } @Override public void processElement(Rule value, KeyedProcessFunction<String, Rule, Rule>.Context ctx, Collector<Rule> out) throws Exception { out.collect(value); rulesState.put(); } }); 如果另一个 KeyedStream#process 方法都使用相同的键进行分区,我们会从另一个 KeyedStream#process 方法中获得相同的状态对象吗? 不。状态对于建立状态的特定运营商而言是本地的。无法从其他任何地方访问它。

回答 1 投票 0

如何根据另一个flink数据流过滤?

我正在尝试学习 Apache Flink,下面提到的用例似乎很简单,以至于 Flink 不支持它?让我开始认为我理解的东西根本上是错误的。 用例是这样的

回答 1 投票 0

Flink statebackend - 并行性如何与 RocksDB 配合使用?

在下面的代码中: 打包支出报告; 导入 org.apache.flink.streaming.api.datastream.DataStream; 导入 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 导入 org.apache...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.