Apache Flink是一个用于可扩展批处理和流数据处理的开源平台。 Flink在一个系统中支持批量和流分析。分析程序可以用Java和Scala中简洁优雅的API编写。
我正在尝试获取一个示例,说明如何将其沉入一个已弃用的文件位。 flink 文档没有帮助,因为我得到的所有内容都已弃用。 公共静态无效主(字符串[]参数) {...
AWS 托管的 Flink 不支持在启动 Flink 应用程序时设置入口类?
通读AWS Managed Flink的文档,特别是“运行时属性”部分,我没有发现任何提及为给定Flink jar设置主类的内容。 例子...
如何使用 PyFlink/Flink 使用 Table API 写入 amazon s3 上的 Apache Iceberg?
settings = EnvironmentSettings.new_instance().in_streaming_mode().build() t_env = TableEnvironment.create(environment_settings=settings) 目录名称=“胶水目录” staging_database_na...
使用 Protobuf 进行 Flink List 字段序列化
我有一个 Foo 类,由 Flink 在 DataStream 中处理: 公共类 Foo { 私有 int id; 公开列表数据; // getter、setter 和构造函数 公共...
KeyedProcessFunction 在 keyBy 中使用 System.currentTimeMillis() 时抛出 NPE
我正在开发一个 Flink 作业,该作业处理 Row 元素并使用 KeyedProcessFunction 应用延迟。问题在于如何在 keyBy 函数中生成密钥。 当我使用系统时。
如何在运行时在 flink-conf.yaml 中注入条目以安全地传递机密?
我正在使用 Flink Kubernetes Operator 在 Kubernetes 中运行 Flink 应用程序,我需要安全地传递 Datadog 报告器的 Datadog API 密钥。挑战在于这个密钥需要
Apache Flink 的 Python SDK (PyFlink) Datastream API 是否支持 Windowing 等运算符?到目前为止,无论我见过多少使用 PyFlink 进行窗口化的示例,它们都使用了 Table API。数据流 API ...
假设我想为我的 Flink SQL 使用 kafka 源...它由 aiven 管理。 我怎样才能访问消息的密钥? 问题: 我正在生成有关源主题的消息,但有点...
如何处理 Apache Flink / AWS Kinesis 连接器中的无效 JSON?
我有一个简单的 Apache Flink (PyFlink) 应用程序,它使用官方 flink 连接器和 Flink TableAPI https://nightlies.apache.org/flink/flink-docs-
我有独立的 Flink 集群。当我在任务管理器上停止该进程时,作为 ChildFirst 加载的类不会被删除。经过多次启动/停止重复后,元空间超出了最大...
Flink 1.15 中的 cleanupInRocksdbCompactFilter 方法
我无法理解 Apache Flink 1.15 中有关 TTL 设置的“cleanupInRocksdbCompactFilter”方法的“queryTimeAfterNumEntries”参数。 医生说: Rocksdb 时清理过期状态
我有一个 flink 应用程序,我使用 TumblingEventTimeWindows 和 process 函数 数据流>processedEvents = rawEvents .keyBy(eventMap -> { 返回
Apache Flink AsyncRetryStrategy 与 RichAsyncFunction
AsyncRetryStrategy asyncRetryStrategy = new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3,fixedDelay=100ms .ifResult(RetryPredicates.EMPTY_RESULT_PREDIC...
将 Flink DataStream<POJOs> 转换为 DataStream<RowData> 以供 Apache Iceberg 使用的最佳方法
我是 Flink 的新手,尝试使用 Flink 与 Kafka 作为数据输入,并使用 Iceberg 来存储数据。 这是我已完成的步骤。 从 kafkaSource 读取 java POJO 的数据流(使用 Avro s...
我在使用Flink(版本1.20)自定义UDAF时遇到了问题。我想实现一个计算中位数的UDAF,我使用了以下两种方法: 公共类 MedianUDAF2 扩展...
Confluence Flink 窗口查询中的 Flink SQL 提示
如何在 Confluence Flink 上的窗口查询中使用提示? 提示示例: /*+ OPTIONS('scan.startup.mode'='latest-offset') */ 我想在如下查询中使用它: 插入主题2(...
Java Flink NoClassDefFoundError org/apache/flink/shaded/guava30/com/google/common/io/Closer
我有一个 Java 21 应用程序,它使用 Apache Flink(版本 1.20.0)依赖项来过滤 kafka 流。 当我尝试执行我的程序时,出现以下错误: [flink-pekko.actor.default-
Flink 无法反序列化 Debezium 生成的 JSON
我正在尝试使用 Flink 来消费 Debezium 生成的更改事件日志。 JSON 是这样的: { “架构”:{ }, “有效负载”:{ “之前”:空, &
java.lang.ClassCastException:类[B无法转换为类org.apache.flink.types.Row
我使用apache pyflink 1.18.1。来自 Apache Flink kafka 源的输入数据类型如下所示, 2023-11-01, 2.7, 怀俄明州, WYURN, 怀俄明州失业率, M, %, NSA 2023-12-01, 2.6, 怀俄明州, WYU...