Apache Flink是一个用于可扩展批处理和流数据处理的开源平台。 Flink在一个系统中支持批量和流分析。分析程序可以用Java和Scala中简洁优雅的API编写。
虽然有一个文件,其中每行都包含以逗号分隔格式的客户数据 客户 ID、电子邮件 同一客户可以有多个电子邮件,因此同一客户会出现多条线路...
我有一个流媒体用例,我想在 Flink 中实现一个自定义窗口函数,其中窗口启动是基于包含特定值的事件发生的。然后,窗口将编程...
我尝试在本地(Win 10 平台)上启动 flink(v1.15.0)作为独立集群,但任务管理器无法启动,导致可用任务槽数为零。 已经关注了...
最近,我升级了现有的 Flink 作业(之前运行 Flink 1.15)以针对官方 Flink Kubernetes Operator(针对 Flink 1.18)运行,并开始看到一些奇怪的行为...
Datastream API 中的 Flink Sink Parquet 压缩
我正在使用流数据API来读取镶木地板数据并丰富写入S3文件系统。在 flink 文档中,它说用于压缩表 API 的结果文件 Parquet 格式也支持
Flink 中 AggregateFunction 的 merge() 方法
我想知道 AggregateFunction 上的 merge() 方法何时被调用。根据我从此处和此处的答案中了解到的情况,它仅适用于会话窗口并且发生在每个...
org.bson.BsonInvalidOperationException:预期为 INT64 类型的值是意外的 OBJECT_ID 类型
我尝试制作简单的 Apache Flink MongoDB 连接器代码来读取和写入 MongoDB 中的 json 数据。首先,下面的代码是 MongoDB Sink 代码。 流执行环境环境 =
我目前正在开发一个使用 flink 的 keyed state 的操作符(sink)。状态后端是基于堆的。状态 ttl 设置为 24 小时。操作员用例是这样的:首先我们捕获请求并存储
我正在对用户的访问日志数据进行实时数据处理。它基本上跟踪用户在办公室的出勤情况。 在 kafka 流中,每当用户刷门时,都会记录 ev...
我使用以下命令在笔记本电脑上运行 Flink: ./bin/start-cluster.sh 我正在编写一个简单的 Flink 作业来从一个主题读取数据并为另一个主题生成相同的数据。 公共类第三个工作{ p...
使用 Flink 的测试工具类来测试我的有状态运算符,我想编写单元测试来验证存储在运算符状态中的数据是否是我所期望的。不过,我好像...
当 kafka 主题摄取的日志数量为零时,用于检测异常的 Flink 程序不会给出任何输出
我正在编写一个pyflink程序,用于使用指数加权移动平均值对进入kafka主题的日志数量进行异常检测。话题所在的kafka经纪人是ru...
目前,我们有一个按预期工作的键控 Window flink 作业。事件进入窗口 - 一些处理是在reduce函数中完成的 - 触发器导致输出到接收器。 现在我们有一个sc...
我目前对窗口和状态感到困惑。假设我有一个程序,每分钟统计用户访问数据,需要在每个窗口中做sum统计。假设此时我配置...
我们正在构建一个 Flink 应用程序,它使用来自不同 Kafka 主题的事件。此应用程序在源上使用有界无序水印策略。在正常执行期间,一切都按 exp 工作...
当我使用FileSink从kafka源保存数据时,文件无法从inprogress状态转换为finished状态
当我使用FileSink从kafka源保存数据时,文件无法从inprogress状态转换为finished状态,但是如果我用随机生成的流替换数据源...
我正在运行一个 flink 作业,它需要两个数据源并将它们连接起来。 连接的输出是一个数据流。但如果连接失败,我也会发出一个侧面输出。现在我从
如果我在将某个操作符保持在状态一段时间后发出一个事件,如果它超过了水印,下游操作符会接受它吗?
我有一个由两个来源组成的 flink 作业。 两个源通过连接键关闭,并且处理函数连接两个流。有时数据可能会延迟 15 分钟。所以我持有...
Apache Flink 中 RoundRobin 分区的实现
嗨,我想在 ApacheFlink 中为运算符实现 RoundRobin 实现,在继续之前我想先说一下,我很清楚这已经在 Flink 中实现了,但是......
我正在尝试在 apache flink 中使用窗口实现自定义分区。目前我有以下内容: 数据流> split = 运算符聚合流 ...