spark-streaming 相关问题

Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。

用于标签的标量正则表达式

我是IT领域的新手,我可以使用Google搜索解决大多数问题,但是这次我无法成功完成,所以我决定寻求帮助,呵呵。我在Scala中编写代码,其中...

回答 1 投票 2

Spark Twitter API失败

您好,我从一个星期以来一直在努力尝试在我的Spark应用程序中从Twitter读取流行标签。我的环境如下所示,是我们组织的客户Cdh 5.14的设置...

回答 1 投票 0

Spark流直接方法中的语义恰好一次

Spark的官方文档说,基于Direct的方法涉及使用SimpleConsumer API,该API不使用Zookeeper存储偏移量,而是使用Spark的元数据存储偏移量...

回答 1 投票 0

执行聚集后如何在Spark中获得未聚集的列?

我正在使用spark-sql-2.4.1v。这里我有这样的情况val df = Seq((2010,“ 2018-11-24”,71285,“ USA”,“ 0.9192019”,“ 0.1992019”,“ 0.9955999”),(2010,“ 2017-08-24 “,71286,”美国“,” 0.9292018“,” 0 ....

回答 1 投票 0

从Kinesis读取Pyspark中的数据

我正在尝试使用KinesisUtils.createStream从Pyspark中的kinesis读取数据,但问题是我遇到此错误。在类路径中找不到Spark Streaming的Kinesis库。尝试以下一种...

回答 1 投票 0

如何在Spark中计算执行者之间的指标

我有一个火花执行程序,其中有许多执行程序。我希望能够在执行程序上使用计数器来计算事件发生的次数。例如,计算...

回答 1 投票 0

如何正常停止笔记本流作业?

我有一个流应用程序,正在运行到Databricks笔记本作业中(https://docs.databricks.com/jobs.html)。我希望能够使用stop()优雅地停止流式作业...

回答 1 投票 0

防止使用Spark Java将重复数据加载到DB2中

我有一个用Java编写的Spark流传输程序,我试图在其中使用来自Kafka主题的消息,转换为数据帧并加载到DB2数据库。我需要执行某些操作(...

回答 1 投票 -2

如何在Spark Scala中更快地处理.gz文件?

我正在读取.gz文件,如val df = spark.read..format(“ csv”).option(“ header”,“ true”).option(“ delimiter”,“ |”).load(“ filepath .gz“)df.createOrReplaceTempView(” df“)当我...

回答 4 投票 -3

如何每5分钟刷新流应用中的元数据数据框?

我正在使用spark-sql 2.4.x版本,对于Cassandra-3.x版本使用datastax-spark-cassandra-connector。随着卡夫卡。我有一个来自kafka主题的财务数据场景,例如FinanceDf I ...

回答 1 投票 0

如何查找在Spark文件流中处理的文件

我有一个结构化的流应用程序,它正在监视Blob存储中的文件夹中是否有新文件,并对其进行处理。它运作良好,我可以监视和群集运行状况,请参阅...

回答 1 投票 0

Spark Streaming-访问Spark SQL数据框中的自定义案例类对象的数组

在我的Spark Streaming查询中,我想使用具有3个字符串成员的称为URL的案例类,如下所示:url:字符串域:字符串topLevelDomain:字符串我想要...

回答 1 投票 0

如何在结构化流传输中将数据帧转换为rdds?

[我使用pyspark流从kafka获取数据,结果是一个数据帧,当我将数据帧转换为rdd时,它出错了:Traceback(最近一次调用是最后一次):File“ / home / docs / dp_model / ...] >

回答 2 投票 1

如何在结构化流传输中将数据帧转换为rdds?

[我使用pyspark流从kafka获取数据,结果是一个数据帧,当我将数据帧转换为rdd时,它出错了:Traceback(最近一次调用是最后一次):File“ / home / docs / dp_model / ...] >

回答 1 投票 0

根据列值火花选择列

我的数据框架构如下:根|-值:结构(nullable = true)| |-之前:struct(nullable = true)| | |-id:长(可空=假) | |-名称:字符串(可空= ...

回答 2 投票 0

Spark Kafka集成在本地计算机上进行实时Twitter流分析的缺点是什么?

我正在使用Spark-Kafka集成来处理我的项目,该项目是在Twitter上找到最热门的主题标签。为此,我使用Kafka通过tweepy Streaming并在...

回答 2 投票 0

在本地计算机上使用Kafka进行Spark实时Twitter流分析有什么弊端?

我正在使用Spark-Kafka集成来处理我的项目,该项目是在Twitter上找到最热门的主题标签。为此,我使用Kafka通过tweepy Streaming并在...

回答 1 投票 0

TypeError:'JavaPackage'对象不可调用,并且在类路径中找不到Spark Streaming的Kafka库

我认为您应该四处移动导入,以便在导入和初始化Spark变量之前先将变量加载到环境中>>

回答 1 投票 0

TypeError:“ pyspark调用kafka流式传输时'JavaPackage'对象不可调用

我认为您应该四处移动导入,以便在导入和初始化Spark变量之前先将变量加载到环境中>>

回答 1 投票 0

为什么我的Spark Direct流向Kafka发送多个偏移量提交?

我正在Spark val中运行以下代码val sparkConf = new SparkConf().setMaster(“ local [*]”).setAppName(“ KafkaTest”).set(“ spark.streaming.kafka.maxRatePerPartition”,“ 10” ).set(“ ...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.