Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。
我有一个DStream [String,String],我需要将其转换为RDD [String,String]。有什么办法吗?我需要使用Scala语言。在此先感谢!
Spark Streaming Kafka中的DStream过滤和偏移管理
我目前正在编写一个Spark流应用程序,该应用程序会从Kafka读取数据并尝试在应用某些转换之前对其进行解码。当前的代码结构如下:val stream = ...
我很新,从pyspark开始,我正在学习使用pyspark将数据从kafka推送到配置单元。从pyspark.sql导入从pyspark.sql.functions导入的SparkSession从pyspark爆炸。...
如何在Spark Structure Streaming中使用JSON数据而不是JSON路径
我在变量jsondata中具有数据,如下所示[{'sno':1,'number':'000-00-00000'}]如何在Spark中进行结构流传输时在JSON()中使用此数据期待我不走的路...
我正在使用Apache Nifi以及Spark和Kafka在它们之间发送消息。首先,我使用Nifi采集数据,然后将其发送到Spark进行处理。然后,我再次将数据从Spark发送到Nifi以插入...
我是kinesis的新手,我正尝试通过spark-streaming(Pyspark)处理kinesis流数据,并遇到以下错误以下是我的代码:我正在将twitter数据推送到我的kinesis流中,并...]] >
正在获取:导入Spark模块时出错:没有名为'pyspark.streaming.kafka'的模块
我需要将从pyspark脚本创建的日志推送到kafka。我做POC,所以在Windows机器中使用Kafka二进制文件。我的版本是-kafka-2.4.0,spark-3.0和python-3.8.1。我是...
[使用PySpark的Kafka createDirectStream
我的主要目的是连接Kafka,创建一个DStream,将其保存为本地变量作为行,并将其写入mongo db,并在PySpark中具有End to End流。但是我在......>
在我的火花运动流应用程序中,我正在使用foreachBatch来获取流数据,并且需要将其发送到drools规则引擎以进行进一步处理。我的要求是,我需要...
是否可以通过spark直接将Writestream用于API
我在Databricks上构建代码以实时读取增量表(读取流),然后需要将此流数据发布到API。在我阅读的所有论文中,writestream仅用于创建文件(.csv,....
我在我的项目中使用spark-sql-2.3.1v和带有Java8的kafka。我正在尝试将主题接收的byte []转换为kafka使用者方面的数据集。这是我拥有的类Company {String ...
输入参数zkQuorum和要传递给Kafkautils.createStream方法的组
我正在尝试执行以下代码,该代码从Kafka Producer中提取消息并进行字数统计。这段代码来自Github。 https://github.com/apache/spark/blob/master/examples/src / ...
我在Databricks上构建代码以实时读取增量表(读取流),然后需要将此流数据发布到API。在我阅读的所有论文中,writestream仅用于创建文件(.csv,....
Kafka-pyspark流式传输:KafkaException:无法构造kafka使用者
我正在尝试使用以下代码通过pyspark订阅Kafka主题:spark = SparkSession.builder.appName(“ Spark结构化流式传输Kafka”)。getOrCreate()行= spark ....
我有一个格式为:ID |值1 |名称:abc; org:tec; salary:5000 2 | org:Ja; Designation:Lead的文件,如何与数据框一起阅读?所需的输出是:1,name,abc 1,org,tec 2,org,Ja 2,...
Spark Streaming作业完全在HD Insights上7天后失败,没有错误日志
我们得到的唯一错误日志是:由于ApplicationMaster尝试使appattempt_1575912755011_0035_000005超时,因此应用程序application_1575912755011_0035失败了5次。申请失败。...
运行Apache Hudi deltastreamer时出错
我试图在AWS EMR上运行Hudi deltastreamer。遵循此博客中的步骤。 https://cwiki.apache.org/confluence/pages/viewrecentblogposts.action?key=HUDI但是当我运行下面的spark提交时,...
使用火花流传输(每5分钟),我将数据作为拼花形式存储在HDFS中。 /data/yyyy-MM-dd/*.parquet例如:/ data / 2020-02-02 / * parquet每个Parquet文件的大小仅以KB为单位每个文件夹...
[目前,我有一些pyspark代码,该代码正在从kafka主题中读取数据(readStream),并且我正计划使用事务更新SQL Server表。流数据将具有全部三个INSERT,...
我是IT领域的新手,我可以使用Google搜索解决大多数问题,但是这次我无法成功完成,所以我决定寻求帮助,呵呵。我在Scala中编写代码,其中...