Apache Flink是一个用于可扩展批处理和流数据处理的开源平台。 Flink在一个系统中支持批量和流分析。分析程序可以用Java和Scala中简洁优雅的API编写。
PoJo 到 Avro 序列化抛出 KryoException:java.lang.UnsupportedOperationException
我的单元测试在 Flink 1.11.2 和 parquet-avro 1.10.0 下运行正常,一旦我使用 parquet-avro 1.12.0 升级到 1.12.0,我的单元测试将抛出异常 com.esotericsoftware.kryo.KryoException:java.lang.
Flink:联合多个kafka源并将它们合并在一起时哪种水印策略合适?
我是flink的新人。我有五个具有不同数据模式的无限卡夫卡源。我想减少消息,然后使用相同的密钥外部连接所有 kafka 源。 所以我使用 union 将它们组合在一起......
enableObjectReuse() 会导致基于 Flink-SQL 的作业中的数据差异
在基于 Flink-SQL 的作业中,我们没有任何自定义类型、序列化器/反序列化器,env.getConfig.enableObjectReuse() 会导致数据差异吗? 创建表 T1 (...) WITH ( 'connector' = 'upsert...
使用基于键控状态的运算符处理流后,我们的流不能再保证是有序的。 因此,我实现了一个排序运算符,它利用处理时间窗口来
Flink 1.15:为 DataStream API 设置 BATCH 执行模式时出错
我正在使用 Flink 1.15 DataStream api 来执行 ETL 工作。我想设置我的作业集BATCH执行模式,所以我使用官方网站中提供的代码。 env.setRuntimeMode(RuntimeExecutionMode.BATCH); 然而...
TumblingProcessingTimeWindow的最小时间间隔
当我设置TumblingProcessingTimeWindows.of(Time.milliseconds(100)))时,可以正确生成窗口。我在这里打印 windowStartTime 和 windowEndTime: 在此输入图像描述 但如果我
使用 GCP PubSub 源在 Flink 作业上获取 http2 异常
我有一个使用 GCP PubSub 作为源的 flink 作业。尽管我能够处理在 pubsub 主题上收到的消息,但我发现它存在一些问题: 已处理的消息较早...
ClassNotFoundException: org.apache.flink.client.CliFrontend 使用 flink 运行 jar 文件时
我已经在 Microsoft Windows 上下载了 Apache Flink 并运行了 start-local.sh。它有效,我可以通过导航到 http://localhost:8081/ 成功访问 Web 界面。 但当我尝试...
在 Flink 文档中,我们被警告:“广播状态中的事件顺序可能因任务而异”。 如何实现跨任务具有一致广播状态的应用程序,
如何使用带有检查点的 Flink 来使用 S3 存储桶中的文件以进行故障恢复
我有一个用例来使用给定 S3 存储桶中存在的文件。 问题是我想确保 Flink 作业仅处理文件的每一行一次,以防作业重新启动。 如果是的话...
应用程序停止时如何从数据库流中的最后一个快照ID启动Flink应用程序
我正在 Java 中创建一个从 Iceberg 流式传输的 AWS Flink 应用程序,想知道 Flink 是否具有提供从成功的最后一个快照 ID 重新启动流的可能性的机制
Flink 作业提交到多虚拟机 Flink 集群失败,并出现 JobSubmissionException、CompletionException 和 Connection Refused 错误
我可以在笔记本电脑上使用 1 个作业管理器和 3 个任务管理器启动 Flink 集群(版本 1.17.x)。集群启动,作业可以在本地主机(我的笔记本电脑)上正确提交。 下一步是...
使用 GCP PubSub 源时在 Flink 作业上获取 DEADLINE_EXCEEDED
在我正在使用的 Flink 作业中使用 Pub/Sub 源时,以下错误每 15 秒就会重复一次。 https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pubsub/ 苏...
在 Flink 应用程序中从 AWS S3 存储桶读取 ORC 文件
我们正在使用 Flink 版本 1.13.5 并尝试从 AWS S3 位置读取 ORC 文件。而且,我们正在将应用程序部署在自我管理的 Flink 集群中。请查找以下代码以了解更多信息...
根据文档“No RocksDB state backend”到广播状态。 这是否意味着每次失败(任务级别或整个 JVM)时,新重新启动的任务都会...
为什么 Flink 在每次调用测试工具中的 processElement 后都会重置我的状态?
我正在使用 Flink 的 KeyedOneInputStreamOperatorTestHarness 并调用 processElement 两次。 processElement 将更新状态以计算所看到的元素数量。 在这种情况下,拨打电话后
将相同的方法引用传递给 apache flink 中的过滤器会抛出 classcastException
以下代码抛出 ClasscastException (java.lang.ClassCastException: class java.lang.Integer无法转换为 class java.lang.String ) 最终数据流源 st1 =
Flink 与 Kafka Source 和 Iceberg Sink 不写
使用Flink我尝试从Kafka读取数据,将Protobuf事件转换为Json字符串并将其写入Iceberg中的表中。 我按照官方文档编写了代码,但我必须有
我有 Flink 流处理应用程序,它从 Pulsar Topic 读取消息流,处理它们并将文件存储在 S3 中。它执行以下操作。 每隔一段时间阅读 Pulsar 主题的消息...
Flink SQL Streaming - 如何在记录更改不确定的情况下有效地连接表
卡卡主题(输入:table1,table2,输出:table3) Flink SQL 流作业 创建临时视图distinct_table1 AS 选择 * 从(选择*, ROW_NUMBER() OVER(按 id 分区,按change_date d 排序...