spark-streaming 相关问题

Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。

Spark-Straming中的DStream到Rdd

我有一个DStream [String,String],我需要将其转换为RDD [String,String]。有什么办法吗?我需要使用Scala语言。在此先感谢!

回答 1 投票 0

Spark Streaming Kafka中的DStream过滤和偏移管理

我目前正在编写一个Spark流应用程序,该应用程序会从Kafka读取数据并尝试在应用某些转换之前对其进行解码。当前的代码结构如下:val stream = ...

回答 1 投票 4

使用pyspark将数据从kafka写入蜂巢-卡住

我很新,从pyspark开始,我正在学习使用pyspark将数据从kafka推送到配置单元。从pyspark.sql导入从pyspark.sql.functions导入的SparkSession从pyspark爆炸。...

回答 1 投票 0

如何在Spark Structure Streaming中使用JSON数据而不是JSON路径

我在变量jsondata中具有数据,如下所示[{'sno':1,'number':'000-00-00000'}]如何在Spark中进行结构流传输时在JSON()中使用此数据期待我不走的路...

回答 1 投票 1

怎么可能? Kafka队列中有重复的记录吗?

我正在使用Apache Nifi以及Spark和Kafka在它们之间发送消息。首先,我使用Nifi采集数据,然后将其发送到Spark进行处理。然后,我再次将数据从Spark发送到Nifi以插入...

回答 1 投票 0

无法使用运动学流在火花流中创建流

我是kinesis的新手,我正尝试通过spark-streaming(Pyspark)处理kinesis流数据,并遇到以下错误以下是我的代码:我正在将twitter数据推送到我的kinesis流中,并...]] >

回答 1 投票 1

正在获取:导入Spark模块时出错:没有名为'pyspark.streaming.kafka'的模块

我需要将从pyspark脚本创建的日志推送到kafka。我做POC,所以在Windows机器中使用Kafka二进制文件。我的版本是-kafka-2.4.0,spark-3.0和python-3.8.1。我是...

回答 1 投票 0

[使用PySpark的Kafka createDirectStream

我的主要目的是连接Kafka,创建一个DStream,将其保存为本地变量作为行,并将其写入mongo db,并在PySpark中具有End to End流。但是我在......>

回答 1 投票 1

如何在Spark执行器上收集所有记录并作为批处理

在我的火花运动流应用程序中,我正在使用foreachBatch来获取流数据,并且需要将其发送到drools规则引擎以进行进一步处理。我的要求是,我需要...

回答 2 投票 1

是否可以通过spark直接将Writestream用于API

我在Databricks上构建代码以实时读取增量表(读取流),然后需要将此流数据发布到API。在我阅读的所有论文中,writestream仅用于创建文件(.csv,....

回答 1 投票 -1

如何解码List 的字节[]到数据集 在spark?

我在我的项目中使用spark-sql-2.3.1v和带有Java8的kafka。我正在尝试将主题接收的byte []转换为kafka使用者方面的数据集。这是我拥有的类Company {String ...

回答 1 投票 -1

输入参数zkQuorum和要传递给Kafkautils.createStream方法的组

我正在尝试执行以下代码,该代码从Kafka Producer中提取消息并进行字数统计。这段代码来自Github。 https://github.com/apache/spark/blob/master/examples/src / ...

回答 1 投票 0

可以通过spark直接在API中使用Writestream

我在Databricks上构建代码以实时读取增量表(读取流),然后需要将此流数据发布到API。在我阅读的所有论文中,writestream仅用于创建文件(.csv,....

回答 1 投票 0

Kafka-pyspark流式传输:KafkaException:无法构造kafka使用者

我正在尝试使用以下代码通过pyspark订阅Kafka主题:spark = SparkSession.builder.appName(“ Spark结构化流式传输Kafka”)。getOrCreate()行= spark ....

回答 2 投票 -1

如何使用数据帧在Spark-scala中读取此自定义文件

我有一个格式为:ID |值1 |名称:abc; org:tec; salary:5000 2 | org:Ja; Designation:Lead的文件,如何与数据框一起阅读?所需的输出是:1,name,abc 1,org,tec 2,org,Ja 2,...

回答 1 投票 0

Spark Streaming作业完全在HD Insights上7天后失败,没有错误日志

我们得到的唯一错误日志是:由于ApplicationMaster尝试使appattempt_1575912755011_0035_000005超时,因此应用程序application_1575912755011_0035失败了5次。申请失败。...

回答 1 投票 0

运行Apache Hudi deltastreamer时出错

我试图在AWS EMR上运行Hudi deltastreamer。遵循此博客中的步骤。 https://cwiki.apache.org/confluence/pages/viewrecentblogposts.action?key=HUDI但是当我运行下面的spark提交时,...

回答 1 投票 0

Spark-将分区数减少为读取的文件夹数

使用火花流传输(每5分钟),我将数据作为拼花形式存储在HDFS中。 /data/yyyy-MM-dd/*.parquet例如:/ data / 2020-02-02 / * parquet每个Parquet文件的大小仅以KB为单位每个文件夹...

回答 1 投票 0

火花流数据以更新SQL Server(事务)

[目前,我有一些pyspark代码,该代码正在从kafka主题中读取数据(readStream),并且我正计划使用事务更新SQL Server表。流数据将具有全部三个INSERT,...

回答 1 投票 0

用于标签的标量正则表达式

我是IT领域的新手,我可以使用Google搜索解决大多数问题,但是这次我无法成功完成,所以我决定寻求帮助,呵呵。我在Scala中编写代码,其中...

回答 1 投票 2

© www.soinside.com 2019 - 2024. All rights reserved.