spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

如何将流查询的结果写入多个数据库表?

我正在使用Spark结构化流媒体,并从Kafka主题中阅读。目的是将消息写到PostgreSQL数据库的多个表中。消息模式为:root |-id:字符串(可空= ...

回答 1 投票 1

如何通过Spark Streaming按ngram进行过滤?

我正在使用Spark Streaming从S3中读取一些CSV文件。文件有3列。其中一列称为movie_plot,我需要做的是过滤掉与某些查询不匹配的记录...

回答 1 投票 0

Spark MicroBatchExecution:流查询取得了进展……真的吗?

我正在运行增量流查询,并且在“什么都没有发生”的情况下,我不断收到StreamingQueryListener拦截的数十亿次更新和QueryProgressEvent-似乎如此。为什么是...

回答 1 投票 0

Kafka的pySpark结构化流不会输出到控制台进行调试

下面是我的代码。我尝试了许多不同的选择变体,但该应用程序仍在运行,但是没有显示每秒写入的消息。我有一个Spark Streaming示例,该示例使用...

回答 1 投票 1

如何从pyspark中的kafka以字符串格式从Confluent Schema Registry获取Avro数据?

我正在从Spark中读取来自Kafka的数据(结构化流),但在Spark中从kafka获取数据的数据不是字符串格式。 Spark:2.3.4 Kafka数据格式:{“ Patient_ID”:316,“ Name”:“ Richa”,“ ...

回答 2 投票 0

火花流的结构化流中的StreamingListener的等效项

目前,我们在Spark Streaming上运行代码,我们希望将其迁移到结构化流媒体上。据我所见,StreamingListener仅用于火花流传输,当我...

回答 1 投票 0

Spark结构化流-与静态数据结合的流数据,每5分钟刷新一次

对于Spark结构化的流作业,一个输入来自kafka主题,而第二个输入是一个文件(将通过python API每5分钟刷新一次)。我需要加入这2个输入并编写...

回答 1 投票 -1

如何在pyspark中的foreach()中将行转换为字典?

我有一个从Spark生成的数据框,我想将其用于writeStream,也想保存在数据库中。我有以下代码:output =(spark_event_df .writeStream ...

回答 1 投票 2

Spark结构化流-流数据与静态数据合并,每5分钟刷新一次

对于Spark结构化的流作业,一个输入来自kafka主题,而第二个输入是一个文件(将通过python API每5分钟刷新一次)。我需要加入这2个输入并编写...

回答 1 投票 -2

_ spark_metadata / 0在压缩批处理9结构化流错误时不存在

我们有使用Spark结构化流实现的流应用程序。它将尝试从kafka主题中读取数据并将其写入HDFS位置。有时应用程序无法给出错误:...

回答 1 投票 5

在pyspark中运行foreach()时将行转换为字典

我有一个从Spark生成的数据框,我想将其用于writeStream,也想保存在数据库中。我有以下代码:output =(spark_event_df .writeStream ...

回答 1 投票 1

如何在流查询中生成摘要统计信息(使用Summarizer.metrics)?

[目前,我正在使用Spark结构化流式传输来创建(id,timestamp_value,device_id,temperature_value,comment)形式的随机数据的数据帧。每批次的Spark数据帧:基于...

回答 1 投票 0

在Spark Streaming或Spark结构化Streaming中将数据集导出为PDF / XLSX

好奇地知道我们是否可以使用Spark Streaming / Spark结构化Streaming生成PDF或XLSX文件以进行报告。根据官方文档,有文件接收器,但是否支持PDF和XLSX?如果是的话...

回答 1 投票 0

spark writeStream无法与自定义S3端点一起使用

对于Spark非常陌生,并且在使用Spark结构化流(v2.4.3)时,我试图将流数据帧写入自定义S3。我已经确保能够登录,上传数据...

回答 1 投票 0

Spark结构化流分析JSON数据

现在,我使用结构化流技术来对接Kafka数据。 Kafka中的数据为JSON格式。我收到的卡夫卡数据看起来像以下JSON数据{“ actly_payed”:“ 300.0”,“ total_amount”:“ 2893 ....

回答 1 投票 -1

如何在pyspark的流查询中生成会话窗口?

以下代码可以按预期方式进行批处理,但是对于带有臭名昭著的AnalysisException的流查询失败:从...

回答 1 投票 2

Pyspark结构化流处理

我正在尝试制作具有火花的结构化流应用程序,其主要思想是从kafka源中读取,处理输入并写回另一个主题。我已经成功地读取了火花,并且...

回答 1 投票 0

如何为流-流连接指定保留时间?

我想了解Spark中结构化流的保留时间。我有不同的Spark结构化流媒体流:流A:它每10秒钟到达一次,从时间开始...

回答 1 投票 1

如何用结构化流的writestream进行分区的文件写入?

我有一个结构化的流代码,可以从Kafka读取数据并将其转储到HDFS。转储数据时,我根据三列对数据进行分区。我面临的问题是许多小文件...

回答 1 投票 0

“ OffsetOutOfRangeException:在没有配置分区重置策略的情况下偏移超出范围”是什么意思?

我使用Apache Spark 2.4.1和kafka数据源。我有以下流查询:Dataset df = sparkSession .readStream().format(“ kafka”)...

回答 1 投票 1

© www.soinside.com 2019 - 2024. All rights reserved.