spark-streaming 相关问题

Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。

如何在本地模式下更改执行程序的数量?

是否可以使用某些Spark Conf设置在本地模式下为Spark Streaming应用程序设置多个执行程序?目前,我无法看到Spark UI在性能方面的任何变化或......

回答 3 投票 2

流处理框架与数据库

为什么不可能使用游标,PL SQL,触发器等使用数据库sql进行实时流处理。为什么我们需要一个单独的流处理框架,如Spark。

回答 1 投票 0

Spark不会在地图功能中打印控制台上的输出

我有一个在集群模式下运行的简单Spark应用程序。 val funcGSSNFilterHeader =(x:String)=> {println(!x.contains(“servedMSISDN”)!x.contains(“servedMSISDN”)} val ssc = new ...

回答 2 投票 2

弹性搜索 - 多个字段作为Spark中的映射ID

我对弹性搜索很新。我正在使用elasticsearch-hadoop 6.2.4版本,我正在从HDFS读取文件,转换为bean对象并写入弹性搜索。我正在使用Spark ......

回答 2 投票 1

Twitter spark streaming:登录尝试次数过多

我正在运行这个由apache提供的示例来传输推文。但是,我无法获取任何数据,因为似乎流API不断尝试连接到Twitter,从而导致此跟踪:...

回答 1 投票 0

_spark_metadata为什么所有镶木地板分区文件都在0内,但集群有2个工人?

我有一个小火花簇,一个主人和两个工人。我有一个Kafka流媒体应用程序,它从Kafka传输数据并以镶木地板格式和附加模式写入目录。到目前为止我...

回答 1 投票 0

计算我的RDD在大型Dstream中的记录

我正在尝试使用文件DStream读取的大型RDD。代码如下所示:val creatingFunc = {()=> val conf = new SparkConf()。setMaster(“local [10]”)...

回答 2 投票 2

Kafkaconsumer对多线程访问不安全

我使用下面的代码来读取Kafka主题,并处理数据。 JavaDStream transformedMessages = messages.flatMap(record - > processData(record))。transnsform(new ...

回答 4 投票 11

如何在驱动程序中提取累加器值?

以下是我的代码排序方式,//累加器初始化val count = new LongAccumulator sparksession.sparkContext.register(count,“count accumulator”)// Streaming Transformation val DF = ...

回答 1 投票 0

默认情况下,spark的Cache内存限制是多少?

spark中缓存的最大限制是多少。它可以同时保存多少数据?

回答 3 投票 1

Spark 2.3.1结构化流输入速率

我想知道是否有办法在Spark Structured流中指定小批量的大小。那不仅仅是说明小批量间隔(触发器),我想说明有多少行......

回答 1 投票 0

如何使用直接流在Kafka Spark Streaming中指定使用者组

如何使用直接流API为kafka spark流指定使用者组ID。 HashMap中 kafkaParams =新的HashMap (); kafkaParams.put(“metadata.broker ....

回答 2 投票 3

找到csv:readStream的多个源

我试图运行下面的代码将文件作为数据帧读取到Kafka主题(用于Spark Streaming),通过Eclipse IDE开发,使用Scala,通过运行瘦jar来适当地定义模式...

回答 1 投票 0

如何热切地执行火花变换?

由于火花的变换被懒惰地评估,我们有什么方法可以急切地执行变换?是否需要进行任何配置更改?例如,我有......

回答 1 投票 1

在Spark Structured Streaming中处理二进制数据

我正在使用Kafka和Spark Structured Streaming。我收到以下格式的kafka消息。 {“deviceId”:“001”,“sNo”:1,“data”:“aaaaa”} {“deviceId”:“002”,“sNo”:1,“data”:“bbbbb”} {“deviceId” :” ...

回答 1 投票 1

如何在spark-submit命令中指定要使用的java版本?

我想在远程服务器上的纱线群集上运行火花流应用程序。默认的java版本是1.7,但我想使用1.8作为我的应用程序,它也在服务器中,但不是...

回答 4 投票 8

如何在Hive DataBase中插入Kafka Spark流式Json数据

我的Streaming JsonData就在这里:{“visibility”:“public”,“response”:“yes”,“guests”:0,“member”:{“member_id”:145170662,“photo”:“http:\ / \ /photos2.meetupstatic ....

回答 2 投票 -1

spark 2.1.1:解析的JSON值与类构造函数不匹配

我对spark 2.1.1和json4s.jackson有一个奇怪的问题。我从spark 1.5.1升级了我的流媒体项目。现在当我在IDE中执行代码时,一切正常。但是在组装和代码之后......

回答 2 投票 7

为什么即使没有新数据,Spark Streaming也会执行foreachRDD?

我有以下Spark流示例:val conf = new SparkConf()。setAppName(“Name”)。setMaster(“local”)val sc = new SparkContext(conf)val ssc = new StreamingContext(sc,Seconds(2)) val ...

回答 1 投票 1

在kafka ofset mgt期间面临问题的“无法访问包kafka010中的对象分配”

环境:Kafka 10,Spark 2.1我试图存储Kafka偏移外部存储。通过Apache Spark网站和一些在线研究后,我能够编写以下代码。现在......

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.