spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

PySpark:连续流相同文件

我已经编写了下面的代码,它可以完美地工作。将新文件添加到目录后,将拾取数据并将其加载到数据框中。现在尝试流式传输同一文件,例如:Job.csv ...

回答 1 投票 1

[导入Pyspark Delta Lake模块时找不到模块错误

我正在用三角洲湖泊运行Pyspark,但是当我尝试导入三角洲模块时,出现ModuleNotFoundError:没有名为'delta'的模块。这是在没有互联网连接的机器上,因此我不得不...

回答 1 投票 0

如何在运行时更改流查询的输入目录?

我正在使用spark.readStream()。text( )运行流作业。是否可以根据当前日期动态更改输入目录?

回答 1 投票 0

使用Spark SQL流时缺少Avro自定义标题

在将Avro GenericRecord发送到Kafka之前,像这样插入标头。 ProducerRecord record = new ProducerRecord <>(topicName,key,message); record.headers()。add(“ ...

回答 1 投票 1

从Spark-shell用Kafka进行火花结构流-获取java.lang.NoClassDefFoundError:org / apache / kafka / common / serialization / ByteArraySerializer

[过去,我使用SBT为KAFKA Integration App添加,包括所有依赖项等,以用于Spark结构化流,并通过程序集生成uber jar。但这是前一阵子。我也是...

回答 1 投票 0

我们可以在Spark结构化流式批处理模式下从特定偏移量从Kafka提取数据

在kafka中,我动态地获得了新主题,我必须使用来自特定偏移量的Spark Streaming处理它。是否有可能通过变量传递json值。例如考虑...

回答 1 投票 0


如何在Spark结构化流中解析没有模式的JSON数据?

使用星型结构化流从kafka消费复杂的嵌套json数据,我没有特定的架构,因为大多数数据包含来自Web的相关requestBody和responseBody ...

回答 1 投票 0

从Spark Streaming获取异常

package com.scala.sparkStreaming import org.apache.spark._ import org.apache.spark.streaming._ object Demo1 {def main(assdf:Array [String]){val sc = new SparkContext(“ local”, “ Stream”)val ...

回答 1 投票 0

Jupyter Notebook上未显示结构化流输出

我有两个笔记本。第一个笔记本正在使用tweepy从Twitter阅读推文并将其写入套接字。其他笔记本正在使用火花结构化流(Python)从该套接字读取推文...

回答 1 投票 1

使用Spark结构化流从Kafka主题中读取:Spark可以解析发布到Kafka主题的多行JSON吗?

是否可以使用结构化流解析/读取通过Spark发布到Kafka主题的多行JSON?] >>

回答 1 投票 0

是否可以在spark结构化流中更改_spark_metadata文件夹的位置?

val query = df.withColumn(“ value”,col(“ value”)。cast(StringType)).withColumn(“ value”,from_json(col(“ value”),processor.Schema)).select(unix_timestamp (col(“ timestamp”))。alias(“ kafka_time” ...

回答 1 投票 0

Spark结构化流和批处理是否相同?

我有两个火花工作。一个是批处理作业,另一个是结构化的流作业。两者都写入相同的文件接收器。两者具有相同的架构。但是,从此接收器读取数据时,仅产生火花...

回答 1 投票 0

如何通过在pyspark上插入子文档将两个文档合并为一个文档?

我有一个大问题,希望在说明要做什么时要明确。我正在尝试在pyspark(Spark结构化流)上获取Stream-Stream结构,并且我想更新相同的内容...

回答 1 投票 1

Spark结构化流未处理Kafka偏移量过期

[我们有Spark结构化的流应用程序,可将数据从Kafka推送到S3。 Spark Job可以正常运行几天,然后开始累积滞后。我们的Kafka主题有效期为6个小时。如果滞后...

回答 1 投票 0

如何将Spark结构化流连接到Redis?

我有一个用例,可实时从Redis获取数据。如何通过Spark结构化的流连接和处理数据?

回答 1 投票 0

在结构化流PySpark中动态扩展Arraytype()列

我有以下DataFrame:root |-sends:数组(nullable = false)| |-元素:整数(containsNull = true)|-元数据:数组(nullable = true)| |-元素:float(...

回答 1 投票 0

Pyspark从结构化流中的地图数组中提取值

我具有以下架构:根|-发送:数组(nullable = false)| |-元素:整数(containsNull = true)|-元数据:数组(nullable = true)| |-元素:map(containsNull = ...

回答 1 投票 0


在Intellij中可视化结构化的流writeStream输出

我在Intellij中拥有结构化流Scala应用程序。使用以下方法以writeStream()形式获取结果后:val StreamingDS = data .writeStream .format(“ json”).option(“ ...

回答 2 投票 -2

© www.soinside.com 2019 - 2024. All rights reserved.