apache-flink 相关问题

Apache Flink是一个用于可扩展批处理和流数据处理的开源平台。 Flink在一个系统中支持批量和流分析。分析程序可以用Java和Scala中简洁优雅的API编写。

读取数据,保持数据N秒,写入数据(Kafka,Flink)

应用程序从kafka主题中读取。每条消息必须是唯一的(重复忽略)保存“N”秒的数据并写入不同的kafka主题作为单独的消息有没有办法保持...

回答 2 投票 0

如何在Flink中跳过损坏的邮件?

如何在Flink中跳过损坏的消息我有DAG:KafkaSrcConsumer> FlatMap> Window> SinkFunction现在如果我在运算符“KafkaSrcConsumer”中从Kafka获取corruptedMessage,我想抛出/跳过...

回答 1 投票 0

keyBy是否跨Flink(scala)中的并行任务对DataStream进行分区?

我想在Flink中的输入数据流上应用ProcessFunction(),以使用单个缓存对象处理每个传入元素。我的代码看起来像这样:对象myJob扩展...

回答 2 投票 0

在同一台计算机上运行两个Apache Flink集群

我想在一台机器上运行两个Apache Flink实例,在单独的unix用户下运行。我已经为jobmanager.rpc.port以及rest.port设置了不同的端口,但是在尝试启动时...

回答 1 投票 0

减少键控流中的功能行为

对于我们的一个用例,我们需要根据文件中的更改重做一些计算,然后广播此文件的结果,以便我们可以在其他流中使用它。 ...的生命周期

回答 1 投票 0

Flink,我在哪里可以找到ExecutionEnvironment #readSequenceFile方法?

我有hdfs数据文件,最初由mapreduce作业创建,输出设置如下,job.setOutputKeyClass(BytesWritable.class); job.setOutputValueClass(BytesWritable.class);工作....

回答 1 投票 1

如何在Java中使用Flinks Splitter方法

flink文档中有一个示例,用于在scala中使用Splitter方法,如下所示://创建一个K TrainTestDataSets数组val dataKFolded:Array [TrainTestDataSet] = ...

回答 1 投票 1

为Apache-Flink提交作业时的Amazon EMR在Hadoop可恢复时出错

添加了依赖关系Pom详细信息: org.apache.flink flink-clients _2.11

回答 1 投票 0

Flink Data Stream CSV Writer不将数据写入CSV文件

我是apache flink的新手并且正在尝试学习数据流。我正在读取学生数据,该数据包含来自csv文件的3列(名称,主题和标记)。我在标记上应用了过滤器,只选择了......

回答 2 投票 0

为什么申请任务管理器时,'-n'或'-yn'等参数不起作用

运行follow命令将作业提交给yarn,但不期望分配任务管理器的数量。集群有足够的资源<220vcores,720G mem> /data/clusterserver/flink-1.7.2/bin / ...

回答 1 投票 2

Flink:如何在TaskManager中设置系统属性?

我有一些来自kafka的代码读取消息,如下所示:def main(args:Array [String]):Unit = {System.setProperty(“java.security.auth.login.config”,“someValue”)val env = .. 。

回答 1 投票 0

丢失检查点协调员后是否可以恢复

我正在使用RocksDB的增量检查点并将检查点保存到远程目标(在我的情况下为S3)。如果有人删除了作业管理器服务器(检查点的位置......)会发生什么?

回答 1 投票 0

Flink检查点到Google云端存储

我正在尝试在GCS中配置flink作业的检查点。如果我在本地运行测试作业(没有docker和任何集群设置),一切正常,但如果我使用docker -...运行它会失败并显示错误。

回答 2 投票 0

LinkedHashMap更改为HashMap并在flink数据流运算符中崩溃

给定这个虚拟代码:1 case class MyObject(values:mutable.LinkedHashMap [String,String])... 2隐式val typeInfoString:TypeInformation [String] = TypeInformation.of(classOf [String])3 ...

回答 1 投票 1

检查点恢复过程如何工作?

我正在尝试了解检查点恢复的过程,到目前为止,我只发现了有关增量检查点机制如何工作的信息。通常,当你谈论备份时......

回答 1 投票 1

使用java进行Apache Flink CEP模式检测[apache-flink]

我想要做;从CEP开始,使用地图结构中包含的任何arraylist元素,并继续使用我已经开始的其他arraylist元素。地图和图案结构:......

回答 2 投票 1

Flink窗口聚合(和其他操作)的结果是否在快照中持续存在?

例如,如果我窗口中的某些数据和.aggregate,如果系统出现故障,我创建的聚合器是否会重置?

回答 1 投票 0

快速TaskManager超时?

我正在运行一个Flink应用程序(通过Yarn),似乎随机有时任务管理器超时,这里是错误:java.util.concurrent.TimeoutException:具有id的TaskManager的心跳...

回答 1 投票 0

这是非侵入性的吗?

我想尝试使用apache flink,hadoop和solr进行非侵入式日志摄取,以便处理和索引它们并进行一些分析和查询。我正在考虑使用flink来获取数据......

回答 2 投票 0

国家后端的澄清

我一直在阅读Flink文档,我需要一些澄清。希望有人可以帮助我。状态后端 - 这基本上是指我的操作数据所在的位置......

回答 1 投票 1

© www.soinside.com 2019 - 2024. All rights reserved.