Apache Flink是一个用于可扩展批处理和流数据处理的开源平台。 Flink在一个系统中支持批量和流分析。分析程序可以用Java和Scala中简洁优雅的API编写。
应用程序从kafka主题中读取。每条消息必须是唯一的(重复忽略)保存“N”秒的数据并写入不同的kafka主题作为单独的消息有没有办法保持...
如何在Flink中跳过损坏的消息我有DAG:KafkaSrcConsumer> FlatMap> Window> SinkFunction现在如果我在运算符“KafkaSrcConsumer”中从Kafka获取corruptedMessage,我想抛出/跳过...
keyBy是否跨Flink(scala)中的并行任务对DataStream进行分区?
我想在Flink中的输入数据流上应用ProcessFunction(),以使用单个缓存对象处理每个传入元素。我的代码看起来像这样:对象myJob扩展...
我想在一台机器上运行两个Apache Flink实例,在单独的unix用户下运行。我已经为jobmanager.rpc.port以及rest.port设置了不同的端口,但是在尝试启动时...
对于我们的一个用例,我们需要根据文件中的更改重做一些计算,然后广播此文件的结果,以便我们可以在其他流中使用它。 ...的生命周期
Flink,我在哪里可以找到ExecutionEnvironment #readSequenceFile方法?
我有hdfs数据文件,最初由mapreduce作业创建,输出设置如下,job.setOutputKeyClass(BytesWritable.class); job.setOutputValueClass(BytesWritable.class);工作....
flink文档中有一个示例,用于在scala中使用Splitter方法,如下所示://创建一个K TrainTestDataSets数组val dataKFolded:Array [TrainTestDataSet] = ...
为Apache-Flink提交作业时的Amazon EMR在Hadoop可恢复时出错
添加了依赖关系Pom详细信息: org.apache.flink flink-clients _2.11
Flink Data Stream CSV Writer不将数据写入CSV文件
我是apache flink的新手并且正在尝试学习数据流。我正在读取学生数据,该数据包含来自csv文件的3列(名称,主题和标记)。我在标记上应用了过滤器,只选择了......
运行follow命令将作业提交给yarn,但不期望分配任务管理器的数量。集群有足够的资源<220vcores,720G mem> /data/clusterserver/flink-1.7.2/bin / ...
我有一些来自kafka的代码读取消息,如下所示:def main(args:Array [String]):Unit = {System.setProperty(“java.security.auth.login.config”,“someValue”)val env = .. 。
我正在使用RocksDB的增量检查点并将检查点保存到远程目标(在我的情况下为S3)。如果有人删除了作业管理器服务器(检查点的位置......)会发生什么?
我正在尝试在GCS中配置flink作业的检查点。如果我在本地运行测试作业(没有docker和任何集群设置),一切正常,但如果我使用docker -...运行它会失败并显示错误。
LinkedHashMap更改为HashMap并在flink数据流运算符中崩溃
给定这个虚拟代码:1 case class MyObject(values:mutable.LinkedHashMap [String,String])... 2隐式val typeInfoString:TypeInformation [String] = TypeInformation.of(classOf [String])3 ...
我正在尝试了解检查点恢复的过程,到目前为止,我只发现了有关增量检查点机制如何工作的信息。通常,当你谈论备份时......
使用java进行Apache Flink CEP模式检测[apache-flink]
我想要做;从CEP开始,使用地图结构中包含的任何arraylist元素,并继续使用我已经开始的其他arraylist元素。地图和图案结构:......
Flink窗口聚合(和其他操作)的结果是否在快照中持续存在?
例如,如果我窗口中的某些数据和.aggregate,如果系统出现故障,我创建的聚合器是否会重置?
我正在运行一个Flink应用程序(通过Yarn),似乎随机有时任务管理器超时,这里是错误:java.util.concurrent.TimeoutException:具有id的TaskManager的心跳...
我想尝试使用apache flink,hadoop和solr进行非侵入式日志摄取,以便处理和索引它们并进行一些分析和查询。我正在考虑使用flink来获取数据......
我一直在阅读Flink文档,我需要一些澄清。希望有人可以帮助我。状态后端 - 这基本上是指我的操作数据所在的位置......