spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

如何使用writeStream将Spark流传递给kafka主题

我正在使用提供流的twitter流功能。我需要使用Spark writeStream函数,如:writeStream函数link //将键值数据从DataFrame写入特定的Kafka ...

回答 1 投票 1

为什么在结构化流中每个阶段最多要处理200个任务?

我有一个在yarn模式下运行的spark结构化流应用程序。我正在尝试减少任务数量,并且我注意到大多数阶段都有200个任务。并且我设置了--conf“ spark.sql.shuffle ....

回答 1 投票 0

如何使用(Py)Spark结构化流从Kafka定义JSON记录(带时间戳)的架构?

问题是,我在使用PySpark阅读Kafka消息后得到了空值。我使用Spark 2.3.1 / Scala 2.11.12我的代码:allData = spark \ .readStream \ .format(“ kafka”)\ .option(“ ...

回答 1 投票 2

如何使用(Py)Spark结构化流从Kafka消息中定义JSON记录(带有时间戳)的架构?

问题是,我在使用PySpark阅读Kafka消息后得到了空值。我使用Spark 2.3.1 / Scala 2.11.12我的代码:import os#os.environ ['PYSPARK_SUBMIT_ARGS'] ='--packages org ....

回答 1 投票 2

Spark结构化流2.3.0中的水印

我在Spark Structured Streaming 2.3.0中从Kafka读取数据。数据包含有关某些教师的信息,其中包括TeacherId,teacherName和TeacherGroupsIds。 TeacherGroupsIds是一个数组列...

回答 1 投票 1

如何处理JSON文档(来自MongoDB)并在结构化流中写入HBase?

将流式方法从spark kafkastreaming更改为结构化流式,所以使用kafkaUtils的较早方法正在生成Dstream [Document]。在结构化流式处理中,我正在获取Dataset [Document] ...

回答 1 投票 1

如何在结构化流中的pyspark中使用foreach运算符(失败的'DataStreamWriter'对象没有属性'foreach')?

我在pyspark 2.3.4中使用结构化流。我试图按如下方式使用foreach运算符:query = projection.writeStream.format('console')。foreach(process_row).start()我收到以下错误:...

回答 1 投票 1

'DataStreamWriter'对象没有属性'foreach'-pyspark

我正在pyspark中使用结构化流进行情感分析。我正在尝试将我的预测显示为:def process_row(row):.... query1 = projection.writeStream.outputMode(“ update”)。format('...

回答 1 投票 0


如何在PySpark中使用foreach或foreachBatch写入数据库?

我想使用Python(PySpark)从Kafka源到MariaDB进行Spark结构化流(Spark 2.4.x)。我想使用流式Spark数据框,而不是静态或Pandas数据框。它...

回答 1 投票 1

在Pyspark中使用foreach和foreachBatch进行Spark结构化流式传输到数据库

我想使用Python(PySpark)从Kafka源到MariaDB进行Spark结构化流(Spark 2.4.x),以对此管道进行测试。我想使用流式Spark数据帧,而不是...

回答 1 投票 0

如何计算Spark结构化流中的滞后差异?

我正在编写一个Spark结构化流程序。我需要创建一个带有滞后差的附加列。为了重现我的问题,我提供了代码片段。此代码使用了data.json文件...

回答 1 投票 6

如何从kafka读取数据并在spark中进行处理时在数据帧中映射案例类对象的Json字符串

案例类WebsitesData(id:Int,websiteType:字符串,zippedData:Array [字节])+ -------------------- + |值| + -------------------- + | {“ id”:0,“ websiteT ... | | {” id“:1,” websiteT ... | | {“ id” ...

回答 1 投票 -1

如何针对Spark中的所有字段以语法形式将Struct Type生成为StringType?

我有* n个字段(例如200-300),所有字段Struct Type我只想作为字符串类型。那里有任何简单的方法,例如下面提到的val schema = StructType(schemaString.split(“”).map(...

回答 1 投票 0

是否计划为每个微批处理执行的流查询(结构化流)的逻辑计划?

我在笔记本电脑上设置了一个小型测试,该测试执行以下操作:我创建了一个Kafka主题,其中包含几千条消息,其中每条消息包含几行,每行约100列。创建300 ...

回答 1 投票 2

如何使用流查询处理来自Kafka的Scala案例类对象?

我正在使用Kafka + Spark集成,在其中我要发送案例类对象(网站)并映射到spark中。案例类Website(id:Int,name:String)隐式val productSchema = Encoders.product [...

回答 1 投票 0

从Kafka读取时如何在Spark中对Case类对象进行反序列化?

我正在使用Kafka + Spark集成,在其中我要发送案例类对象(网站)并映射到spark中。案例类Website(id:Int,name:String)隐式val productSchema = Encoders.product [...

回答 1 投票 0

用于Kafka主题的PySpark结构化流的Cassandra Sink

我想使用PySpark结构化流API将结构流数据写入Cassandra。我的数据流如下所示:REST API-> Kafka-> Spark Structured Streaming(PySpark)-> Cassandra ...

回答 1 投票 0

如何访问数据源选项(例如kafka)?

我正在设置Spark批处理选项以从Kafka使用,但是当我尝试获取config属性时,它显示为None。为什么这样 ? val df = sparkSession .read .format(“ org ....

回答 1 投票 0

如何访问数据源选项(例如kafka)?

我正在设置Spark批处理选项以从Kafka使用,但是当我尝试获取config属性时,它显示为None。为什么这样 ? val df = sparkSession .read .format(“ org ....

回答 1 投票 1

© www.soinside.com 2019 - 2024. All rights reserved.