Apache Flink是一个用于可扩展批处理和流数据处理的开源平台。 Flink在一个系统中支持批量和流分析。分析程序可以用Java和Scala中简洁优雅的API编写。
最近在linux系统安装了Flink kubernetes operator。我为 FlinkDeployment 和 FlinkSessionJob 创建了 yaml 文件。 FlinkDeployment创建成功但是FlinkSessionJob无法创建...
如何在 Flink-SQL 中将 json 字符串转换为 Array<Row<>>?如何使用 Flink-SQL 将嵌套的 json 数组字符串写入 Elasticsearch?
如何使用sql api创建数组列并发送elasticsearch? https://github.com/ververica/flink-cdc-connectors/issues/2147 我需要在 elasticsearch 中创建一个大型物化视图,whi...
在自定义 Flink KeyedProcessFunction 之间共享客户端对象
我有一个对象,它是外部服务的客户端,我正在从中获取数据以与另一个 Kafka 源连接,该源是 Flink 拓扑中的运算符。 有什么方法或技巧可以...
尝试使用本地 Kinesis 流在本地运行 flink 应用程序。 下面的代码完美地工作(如,记录可以在接收器表路径中看到),但是当我从
我有一个值状态,它存储一些计算数据,这些数据将被频繁访问并且计算起来非常昂贵,所以我使用一个值状态在键控过程函数中缓存该信息。
在Flink中做keyBy操作后,为Data Stream中的每个key创建单独的Table
我的需求是为两个不同的数据流中的每个键创建单独的表,然后将它们连接起来。我已经成功地从 Flink 中的数据流创建了两个单独的表并执行...
如何在 Flink 中为 GenericRowData 类型对象提供 TypeInformation
我正在使用反序列化器解析 Kafka 流(JSON 字符串),然后我使用 GeneericRowData 类将对象节点转换为类型 RowData 实例,hudi 支持 w...
我的用例有点独特,我需要一些帮助,看看我如何或是否可以通过 Flink 做到这一点。我有一个来自 Kafka 主题的数据流,其中有一个字段(例如用户 ID,前夕...
Flink 自定义触发器不触发 onProcessingTime
我有一个当前实现的自定义触发器,它应该在处理了 maxElements 或 timeoutMs 已经过去时触发。目前,触发器的 maxElements 部分有效......
在两个不同的翻滚时间窗口中从 Kinesis 数据流(或任何数据流)读取
我有一个 Kinesis 数据流,我正在使用滚动窗口使用数据。我有两个用例,一个是在 5 分钟的翻滚窗口中使用数据,另一个是 1 分钟的翻滚窗口......
Apache Flink:从 `window(...).apply` 迁移到 `window(...).redcuce`
我有一个 Flink 应用程序,它会在一定时间内收集事件,然后将它们下沉。我使用了应用功能: .window(TumblingEventTimeWindows.of(Time.minutes(30))) .申请(
对于 Apache Flink 聚合,是拥有复杂状态的聚合更好,还是拥有更小的聚合但更多的任务。 例如,如果我有一个关于用户观看视频的数据流......
我想通过 HTTP 协议将我的 DataStream 流的计算结果发送到其他服务。我看到了两种可能的实现方式: 在接收器中使用同步 Apache HttpClient 客户端 公共...
我每天都在整理一些结果。 我对我的数据进行了窗口连接,窗口大小为 1 天。 kafka数据源有两个:A和B。 我需要对 A 进行过滤并删除重复项...
PyFlink KafkaSink 抛出 AttributeError: 'NoneType' object has no attribute 'startswith'
我正在尝试阅读一个 kafka 主题,并使用 pyflink(flink 版本 1.16)中的 KafkaSource/KafkaSink 在另一个 kafka 主题中编写相同的内容。阅读 kafka 主题作品,我能够打印 re...
Flink + Sink to S3 saving to wrong folder under the bucket
我正在使用以下 SQL 查询以镶木地板格式将数据接收到 S3: 插入 sink_table_s3 选择 event_id, event_type, event_name, DATE_FORMAT(TUMBLE_END(proc_time, INTERVAL '1' HOUR), 'yyyy-...
如何将 Flink S3 sink Delivery mode 从 EXACTLY_ONCE 更新为 AT_LEAST_ONCE
默认情况下,Flink S3 接收器以 EXACTLY_ONCE 交付模式写入。我想将它更新为 AT_LEAST_ONCE 模式。我无法找出需要更新哪些配置才能实现。 从...
我有一个 RichAsyncFuntion 并希望在其中运行计划任务。我正在尝试使用 ProcessingTimeService.ProcessingTimeCallback 但无法从
如何在 Flink streaming 中缓存算子级别的并发 hashmap?
可能类似于此处发布的问题,但我的目标是在单个平面映射运算符中跨所有并行性共享并发哈希图。 我有一个包含 的哈希图 可能类似于 here 发布的问题,但我的目标是在单个平面映射运算符中跨所有并行性共享并发哈希图。 我有一个包含 映射的 hashmap,我希望这个映射在运行我的 flatmap 函数的所有任务槽之间共享。 我想要它共享的原因是因为我的平面图是通过 Tuple2 中的 2 值键控的,第一个值是我关心的键,第二个值是辅助键,因为我希望每个事件都有两个键组合最终成为一名工人并聚合。因此,第一个键可能会出现在多个工作线程中,所以我想要一种方法来像某种缓存一样在所有工作线程之间共享一个巨大的哈希图。 Flink 可以吗?这是个好主意吗?谢谢。
Pyflink->Elastic 将 Varchar 转换为 Long?
我上周开始使用 Pyflink,发现自己陷入了困境。 基本上我尝试从源 A 导入数据并将其下沉到 Elastic,效果很好,但有一个特殊的 ...