spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

Spark-结构化流式Kafka(动态反序列化)

假设我们在流中订阅了2个主题,一个主题是avro,另一个主题是字符串,是否可以根据主题名称动态反序列化?

回答 1 投票 0

如何获得星火结构流应用程序的kafka消费者滞后

我正在为我的Spark结构化流应用程序构建监视,并且需要让Spark应用程序消耗某个主题的消费者滞后。我相信火花驱动程序必须知道...

回答 1 投票 0

火花结构化流可视化

我正在尝试可视化结构化流中的流查询。我该怎么办?我应该使用仪表板还是其他工具?我在网上找不到任何类似的东西。 DF =火花\ ...

回答 1 投票 0

“格式错误的数据长度为负数,当尝试使用带有Avro数据源的kafka的Spark结构化流式传输时

因此,我一直在尝试使用Kafka和Avro数据来尝试Angel Conde的结构化流,并使用Avro数据进行结构化流式处理Avro然而,似乎其中的嵌套数据使我的数据有点复杂。这是我的代码,...

回答 1 投票 0

无法使用火花结构化流反序列化avro消息,其中键已字符串化,值是avro

使用Spark 2.4.0 Confluent schema-Registry接收模式消息Key在Avro中的String和Value中被序列化,因此我试图使用io.confluent.kafka来仅反序列化Value。

回答 1 投票 0

如何删除Spark结构化流创建的旧数据?

如何删除由Spark结构化流(Spark 2.4.5)创建的旧数据?我有Parquet / Avro格式(不是Delta)的HDFS数据,该数据是由Spark结构化流创建的,并由...

回答 2 投票 0

如何使用Spark结构化流从EventHub解压缩Gzip文件

是否有一种方法可以从Eventhub中读取gzip文件,并使用Spark结构化流将其解压缩,因此希望一次使用Spark结构化流触发将未压缩的json存储在ADLS中。我是...

回答 1 投票 -1

火花结构化流水印和重复副本?

[我正在尝试使用水印放置dropDuplicate,问题是水印无法清除状态,我的代码是:def main(args:Array [String]):Unit = {@ @transient lazy val log = LogManager.getRootLogger。 ..

回答 1 投票 0

如何在Spark独立集群中获取应用程序状态?

根据官方spark文档,我们可以使用spark-submit --master spark:// IP-ADDRESS:PORT --status SUBMISSION_ID来检查状态,但是当我尝试使用它时却无法...

回答 1 投票 -1

可以在完成输出模式下的Spark结构化流中丢弃/控制中间状态吗? (Spark 2.4.0)

我有一种情况,我想处理来自kafka主题的数据。我有这个特定的Java代码,可以从kafka主题中以流的形式读取数据。数据集 streamObjs = sparkSession ....

回答 1 投票 0

Scala:从Spark结构化流中读取Kafka Avro消息时出错

我一直在尝试从Scala 2.11的Spark结构化流(2.4.4)中读取Kafka的avro序列化消息。为此,我使用了spark-avro(下面的依赖项)。我生成kafka ...

回答 1 投票 0

Spark Streaming:从Kafka读取JSON并添加event_time

我正在尝试编写从Kafka读取的有状态Spark结构化流作业。作为要求的一部分,我需要在流中添加“ event_time”作为附加列。我正在尝试...

回答 1 投票 0

为什么将RDD更改为DataFrame时会发生Spark不可序列化异常?

我正在使用结构化流,并且以下代码有效val j = new Jedis()//无法序列化的redis客户端。 xx.writeStream.foreachBatch {(batchDF:DataFrame,batchId:Long)=> {j ....

回答 1 投票 0

Spark结构化流中的多个聚合和不同的函数

我需要对来自Kafka的流数据进行一些汇总,并每M秒将结果的前10行输出到控制台。 input_df =(spark .readStream .format(“ kafka”)...

回答 1 投票 0

对org.apache.spark.streaming.kafka.KafkaUtils的依赖性

我正在尝试将星光流与kafka集成在一起。我无法解决org.apache.spark.streaming.kafka.KafkaUtils的依赖关系。下面是我的build.sbt:名称:=“ StreamingTest”版本:=“ 1.0” ...

回答 1 投票 1

如何在Spark结构化流媒体中保存通过水印删除的记录

通过加水印可以自动删除Apache Spark结构化流中的旧状态数据。在结构化流编程指南.md中,字数示例演示了如何对水印进行加注...

回答 2 投票 0


Spark(直接)流是Spark结构化流还是Kafka流的自然替代品?

[在过去的几年中,我们已经开发了相当多的Spark Streaming(Direct API)应用程序,它们可以在Cloudera平台上与Kafka,IBM MQ,Hive,HBase,HDFS以及其他类型进行读写。...>>

回答 1 投票 0

火花结构化流-带有水印替代解决方案的dropDuplicates

我正在尝试使用带有水印的dropDuplicate函数对流数据进行重复数据删除。我目前面临的问题是,给定记录需要两个时间戳,一个是...

回答 1 投票 1

维护输入流数据的时间戳序列

所以我正在使用结构化流将一些json数据从kafka推送到Spark。以字典形式的数据具有“时间戳”字段。解析json并为每个键获取单独的列后,...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.