Apache Flink是一个用于可扩展批处理和流数据处理的开源平台。 Flink在一个系统中支持批量和流分析。分析程序可以用Java和Scala中简洁优雅的API编写。
Flink 流式传输管道,具有 1 个 kafka 分区,具有 5 个并行度,具有带 1 分钟滚动窗口的键控事件窗口。我们使用带有默认周期性生成的升序时间戳水印...
org.apache.kafka.common.errors.UnsupportedVersionException
尝试将已运行的应用程序的 Flink 版本从 1.14.2 升级到 1.17.1。该应用程序具有与 Kafka 一样的源和接收器(不同主题同一集群)。升级版本后
如何在 apache flink 检查点中测试 RocksDB 压缩
我有一个 Flink 管道,运行着一些过滤器、映射、聚合器和 Windows 运算符。启用增量检查点的 RocksDB 后端。 我的检查点数据大小正在逐渐增长。我
我是 Flink 新手,有一个用例来使用 Topic1 中的数据并在数据库中插入/更新,并将相同的数据推送到将由不同服务使用的 Topic2。我现在拥有的代码有点......
如何与windows操作符一起配置aggregateFunction的状态TTL
我有一个flink流管道,配置了一些过滤器、map、aggregatorFunction和windows操作符(5分钟的翻滚窗口)。我正在使用增量rocksDB后端(它是存储...
我正在处理需要根据库存数据进行转换的流。本质上,库存数据会很大(没有确切的数字),并且会有大量的读取和很少的写入......
“类 java.time.LocalDate 无法转换为类 java.lang.Number”,在 Avro 中保存时,逻辑类型为日期,类型为 int
我正在尝试在 parquet 输出中保存 Avro 中存在的具有日期逻辑类型的字段(它在使用 int 作为数据类型保存时有效,但在尝试保存为日期逻辑类型时给出错误...
我的flink sql语句如下 创建或替换表 table_one /** mode('streaming')*/ ( `pk` 字符串, `id` 字符串, `段`数组, `头...
Flink 无法解析 DataStream API 中 Debezium 写入 Kafka 的 Avro schema
似乎没有解决方案可以自动将 Confluence SchemaRegistry 的最新 schema 与 Kafka-topic 中的数据同步。因此,我通过从 Confluence Schema Regi 复制来手动添加架构...
我是 flink 新手,正在阅读 Flink 1.8 源代码(https://github.com/apache/flink/tree/release-1.8)来了解 flink 如何与 YARN 配合使用。 我知道有分离模式和非分离模式...
我正在使用 Flink 流从多个资源(包括文件)读取输入。我的目标是触发一些 定期计算(处理时间)并在到达文件末尾时触发...
如何在 Apache Flink 中连接 RabbitMQ“流”?
有没有办法连接RabbitMQ“流”而不是Apache Flink中的队列? 谢谢你! 我能够获取 RabbitMQ 队列作为在 Apache Flink 中工作的源,但如何获取 RabbitMQ s...
TestProcessingTimeService 和 InternalTimeServiceManagerImpl 有什么区别?
使用 ProcessFunctionTestHarnesses 对 Apache Flink 进行单元测试时 KeyedOneInputStreamOperatorTestHarness testHarness = ProcessFunctionTestHarnesses.forKeyedProcessFunctio...
将 Flink SQL Client 连接到 Kubernetes 上的 Flink 集群
我已经使用 Terraform 和 Helm 在 Kubernetes 上设置了 Flink 集群。我使用了 Flink Operator,并且还单独将 Flink SQL Client 下载到我的机器上。 资源“helm_release”...
使用 Flink 从数据库动态获取新记录并发布到 kafka/axon 主题
我是 Flink 新手,有 2 个 Flink 作业的用例,第一个是使用 Topic1 中的数据并在数据库中插入/更新,第二个是获取数据库中插入/更新到由不同服务使用的 Topic2 的任何新数据。 .
我有一个kafka主题,其中包含一些事件类型。(这是给出的) 事件是 JSON 文档。 我们将事件类型称为:A、B、C、D、E。 我可以通过使用每个字段中的字段来判断类型
我在 Flink 中有一个用户案例,我需要连接到 2 个广播流来执行某个 KeyedProcessFunction 操作。 Flink允许这样做吗?我看不到任何支持多个
我有一个 Kafka 主题,其中我每 2-3 秒生成一个条目 然后我有 PyFlink 作业,它将格式化条目并将它们发送到数据库 这是我的 Flink 环境设置 env = StreamExecutionEnvironm...
我正在使用以下方式运行 Flink 应用程序 python -m 应用程序.job job.py 是目录 app 中的一个模块 它运行良好并使用 PyFlink 处理数据。 但是,该作业不会提交到本地集群(Docker)......
假设我们有一个 DataStream,并且可以将 MapState 附加到每个 String 元素,同时将其传递到下游。喜欢: ds.keyBy(s -> s.hashCode() % 10) .process(new KeyedProcessFunci...