Apache Flink是一个用于可扩展批处理和流数据处理的开源平台。 Flink在一个系统中支持批量和流分析。分析程序可以用Java和Scala中简洁优雅的API编写。
我正在远程集群上工作。我有一个运行很长时间的任务,似乎有问题,所以我通过网络客户端甚至命令行取消了该任务。不幸的是取消了
我的想法是设置一个参数,根据每个窗口的数据调整它的值,并使用这个值来调整下一个窗口的大小。我应该如何实现这个功能 Flink 不...
就像标题所描述的那样, 我已经在我的Mac本地启动了flink 但我无法打开 localhost:8081 网站查看 flink 状态。 当我像官方网站一样运行 wordCount 示例时...
如何在Flink服务器上连续运行apache flink流作业
您好, 我为流作业编写了代码,其中源和目标是 PostgreSQL 数据库。我使用 JDBCInputFormat/JDBCOutputFormat 来读取和写入记录(参考示例)。 代码:
如何在Flink 1.13.5中构建ActorSystem?
这就是我在 Flink 1.8.5 中构建 ActorSystem 的方式。 公共静态 ActorSystem createNewActorSystem() 抛出异常 { String ip = HostPortUtil.getLocalIp(); 配置配置=新
无法获取 CsvReaderFormat 以供 Flink 读取 CSV 文件
我正在尝试使用文档中提到的 CsvReaderFormat 读取 flink 执行类中的 CSV 文件的简单任务。我有一个名为subscriberADSR的pojo(我知道这是一个不好的做法......
Apache Flink - 高 promethues 指标基数
在我们的组织中,我们有许多系统在 flink 1.16 上运行。 我们使用 PrometheusReporterFactory。 将我们的指标暴露给 promethues scrape。 由于 flink 的动态标签定义
出现以下错误 引起:java.lang.NoClassDefFoundError:org/apache/kafka/clients/admin/AdminClient 将 flink 连接到 kafka 时 我正在使用 flink 1.17 并使用 flink-sql-connector-kafka...
Apache Flink 中自定义 Partitioner 实现的分区方法是否需要线程安全?
我正在通过扩展 Apache Flink 中的 org.apache.flink.api.common.functions.Partitioner 接口来实现自定义分区器。这需要我重写分区方法。我的问题是...
我遇到了与java.util.List和java.util.Map的Flink序列化所描述的相同的问题,但对于java.util.Set。 我有一个 POJO 类,其中包含列表字段和设置字段。我已经能够...
Flink Kafka GroupId 在使用 KafkaSource 时似乎被忽略了
我是 Apache Flink 的新手。我尝试使用 Flink 的 KafkaSource 从 Apache Kafka 获取事件。到目前为止一切顺利,看起来效果很好。重新启动 flink 任务后,我得到了相同的信息...
Flink:无法在类路径中找到实现“org.apache.flink.table.factories.CatalogFactory”的标识符“kafka”的任何工厂
我正在尝试将Kafka连接到Flink并通过sql-client.sh运行。但是,无论我如何处理 .yaml 和库,我都会不断收到错误: 线程“main”org.apache.flink 中出现异常。
我使用以下简单代码来说明文件系统连接器的行为。 我有两个观察结果想要询问并确认。 如果我没有启用检查点,那么所有
在flink 1.14中,我怎样才能拥有一个同时写入kafka和其他数据源的接收器。 我尝试创建一个扩展 RichSinkFunction 的自定义接收器。在开放方法中,我也
我的数据源发出具有以下结构的物联网数据 - io_id、值、时间戳 232,1223,1718191205 321,671,1718191254 54,2313,1718191275 232,432,1718191315 321,983,1718191394 ………… 有...
我正在 Apache Flink 中编写数据流处理代码,但我在 Apache Flink 文档中找不到任何提及基于计数的窗口的内容。 我检查了下面的链接,但是...
java.util.HashMap 和 java.time.Duration 不是 Flink 的有效 POJO 类型
我的 Flink 1.15 应用程序的日志中有“java.time.Duration”和“java.util.HashMap”的提示: 类 class <*> 不能用作 POJO 类型,因为并非所有字段都是有效的 POJO fie...
使用 Table API 将作业名称设置为 Flink 作业
我想为使用 Table API 编写的 Flink 应用程序设置一个作业名称,就像我使用 Streaming API env.execute(jobName) 所做的那样。 我想更换: 我在文档中找不到方法,除了......
Flink Operator 卡在 100% 繁忙状态,如何解决?
我已将 Flink 集群部署为纱线应用程序。作为纱线配置的一部分,我将 32 个 vCore 关联到每个任务管理器。我还为每个任务管理器分配了 2 个插槽。 工作管道:Kafka
我有一个用 scala 编写的 flink 作业,我正在创建一个自定义指标来计算流中的事件数量。该作业部署在 kubernetes 上,我看到了作业管理器和任务的系统指标-