Apache Flink是一个用于可扩展批处理和流数据处理的开源平台。 Flink在一个系统中支持批量和流分析。分析程序可以用Java和Scala中简洁优雅的API编写。
如何通过保存点恢复作业,并在应用程序模式下通过executeAsync运行多作业(flink 1.18)
我正在开发 1.18 版本的 flink java,并希望使用应用程序模式在一个 pod 中运行 2 个作业(k8s docker 部署)。 在java代码中,我使用for语句使用env创建2个或更多作业。
即使所有分区中都没有新数据,如何在 Flink SQL 中为 Kafka 源提前水印?
我有一个简单的Flink(v1.17)SQL流作业,使用Kafka作为源,我配置了一些与水印相关的配置,但我似乎不明白如何强制
在 Apache Flink DataStream API 中访问 Kafka 元数据
使用 Apache Flink DataStream Connectors for Kafka 创建源时如何访问 Kafka 元数据?,我注意到在 Apache Flink Table API Connector for Kafka 中,我们能够访问记录
我想使用 Flink 合并两个(多个)流。两个流本身都是有序的,我希望合并结果也被排序。举个例子 [1,2,4,5,7,8,...] 和 [2,3,6,7,..] 应该产生
例如,我有一个 flink 窗口应用程序,其数据源是一个发送来自不同公司的员工姓名的 kafka 流。现在,如果一家公司的数据源停止提取数据,我想要
Flink 似乎有数据采样器,但我没有看到任何如何在 Flink 流应用程序中应用数据采样器的示例。 是否可以在 Flink Streaming 中应用数据采样器
我有一个 Flink 作业,我不明白为什么它不会打印到标准输出。我注意到,如果删除过滤器和水印,我会看到来自我的 kafka 主题的原始消息。但是应用聚合...
Flink Timer onTimer 事件 - 它们是否强制重新分配流?
我有 KeyedProcessFunction (名为 Stateless),可以处理每个键的数据(例如事务 id)。 无状态是轻量级的,不使用状态——它只设置计时器一个计时器。 我正在上游阅读
无法使用 apache flink 的接收器功能将带有标头的 CSV 文件写入 S3 存储桶
我的项目需要使用 apache flink(版本 1.18.0)的接收器功能将 csv 文件写入 S3 存储桶,其中包含标头。使用的编程语言是java。 Hadoop 文件系统是...
为什么Flink Exactly Once commit不会失败?
我正在使用 Flink EO。重新启动 Flink 作业时,我收到此警告: 2023-12-30 13:07:44.538 [Co-Flat 地图 -> Sink: Sink1 (3/8)#0] 警告 o.a.f.s.api.functions.sink.TwoPhaseCommitSinkFunc...
为什么 Flink Exactly Once commit 不会失败?
我正在使用 Flink EO。重新启动 Flink 作业时,我收到此警告: 2023-12-30 13:07:44.538 [Co-Flat 地图 -> Sink: Sink1 (3/8)#0] 警告 o.a.f.s.api.functions.sink.TwoPhaseCommitSinkFunc...
我在使用 Apache Flink 写入 HBase 表时遇到问题。我已经成功配置了从 Kafka 读取和写入以及从 HBase 读取 RowKey。然而,当尝试...
Flink SQL ROW_NUMBER 由于流模式下的 OVER windows 排序而导致聚合错误,必须在时间属性上定义
选择 EMPLOYEE_ID、DEPT_NAME、LOGGED_IN_AT、ROW_NUMBER() OVER(PARTITION BY EMPLOYEE_ID ORDER BY LOGGED_IN_AT ASC ) AS ROWNUM FROM inputTable 面临错误:超过窗口在流模式下的排序 mu...
如何在 flink-conf.yaml 中的 flink 指标报告器的 filter.includes 参数中包含多个过滤器?
我想了解通用flink指标报告器的工作原理,并且我正在尝试filter.includes参数。如何在 flink-conf.yaml 中为此选项指定多个过滤器?文档
我是flink新手,不知道这是正确的方法还是愚蠢的事情,我有一个字符串数据类型的数据流,我正在尝试将数据流中的数据捕获到列表中,我正在尝试一些...
我想构建一个对数据进行一些聚合的应用程序,并使用计时器将该聚合发送到一个异步步骤,该异步步骤将其转储到其他地方。数据发送后...
Apache Flink GCS FileSink 由于许多小文件而性能不佳
我正在开发一个apache-flink(v 1.17.2)流应用程序,其中我的文件接收器是Google Cloud Storage。性能非常差,我的任务管理器 100% 忙。我相信这是有关系的...
尝试从 ADLS 设置 Pyflink 流,当前尝试使用 StreamExecutionEnvironment.from_source() 方法读取 json 文件。 代码如下所示: 来自 flink.plan.Envir...
解决Flink SQL windows tumble性能下降问题
我有一个flink sql应用程序,它将来自Kafka的数据实时分组到数据库中。数据按照时间间隔进行聚合:[1,5,10,30,60min,daily]并写入对应的DB表...
当下游任务并行度不同时,使用 KeyBy 与 reinterpretAsKeyedStream()
我在reinterpretAsKeyedStream的文档中看到这个警告 警告:重新解释的数据流必须已经以与 Flink 的 keyBy 分区方式完全相同的方式进行预分区...