Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。
使用 Kafka 进行 Spark Structured Streaming 时我们什么时候需要检查点?在 ReadStream 或 Write Stream 中?
我是 Kafka 和 PySpark+Structured Streaming 的新手,我们需要从 Kafka 主题流式传输数据并在数据经历多次转换时摄取到另一个表中。 我明白...
有没有办法在我只读的表上防止 ConcurrentAppendException?
我有一个结构化流,它从增量表会话中读取并写入增量表记录。目标是让流继续运行(这就是我认为流应该工作的方式),
在 ETL 之后写入数据库或 myApp(接收器)之前,Spark 只有窗口结果
输入kafka => ETL spark =>输出到kafka。 假设您要计算每个用户的总观看次数,但这些观看次数可能以百万为单位。如果你只是在数据中使用 KAFKA 接收器......
Left Outer Stream-Stream SELF join using Spark Structured Streaming - Kafka
我正在尝试使用左外连接使用 Spark 结构化流进行流-流自连接,以便我可以在之后拆分连接和未连接的行。 我的设置如下: df_source = app....
我已经使用自动加载器 bronze-->silver-->gold 实现了数据管道。 现在,当我这样做时,我想执行一些数据质量检查,为此我正在使用 great expectations 库。
无法使用 pyspark readstream 从 kafka 主题读取记录数组
我正在使用来自 kafka 主题的 pyspark readstream 以及一系列记录,例如 [ {}, {}, {} ]。 我能够使用 from_avro( F.col('value'), avro_schema ) 解析单个记录。 然而,实际...
我有一个连接查询,它有另一个连接查询作为子查询,但该查询没有输出。我单独运行子查询来找出问题所在,并且它按预期工作。 我正在尝试...
使用 Spark streaming + Kafka 时如何修复过期批次?
我正在尝试使用 foreachBatch() 从 kafka 主题读取数据,如下所示。 def write_stream_batches(spark: SparkSession, kafka_df: DataFrame, checkpoint_location: str, kafkaconfig: dict): 问题...
在 Spark Streaming 中使用 UDF 读取大量 XML 到 Delta 表非常慢
我们有一个输入文件的存储库,如 �3 \*\*Events*.xml => 这表示需要在 Spark Structured Streaming 中读取的输入 XML 文件的路径,以便...
使用 toTable 在 Databricks 中写入流不会执行 foreachBatch
下面的代码正常工作,即将数据写入输出表,并可在 10 秒内从表中选择。问题是 foreachBatch 没有被执行。 当我有...
使用 Kafka 驱动程序从 Azure 事件中心读取似乎没有获得任何数据
我在 Azure Databricks python 笔记本中运行以下代码: 主题 = "myeventhub" BOOTSTRAP_SERVERS = "myeventhubns.servicebus.windows.net:9093" EH_SASL = "kafkas...
Spark Shuffle Read 和 Shuffle Write 在结构化尖叫中增加
在过去的 23 小时里,我一直在使用 Kafka 运行 spark-structured streaming。我可以看到 Shuffle Read 和 Shuffle Write 急剧增加,最后,驱动程序因“ou ...
writeStream()在批次数据中打印空值,即使我在kafka中通过writeStream()提供适当的json数据。
我试图使用模式转换json,并将值打印到控制台,但writeStream()在所有列中打印空值,即使我给了适当的数据。数据我给kafka主题...{"股票":。
在spark结构化流作业中,我如何从每个微批中的相同起始偏移量读取?
我使用的是spark结构化流。能否在每个批次执行后重置Kafka偏移量,使每个批次从相同的起始偏移量读取,而不是只读取新发现的事件?...
是否可以在每次触发时,在聚合时间窗口结束前,输出聚合数据?上下文。我正在开发一个应用程序,从Kafka主题读取数据,处理数据,... ...
我写了下面的代码,我想从kafka读取并写入按年、月、日和小时分区的parquet文件。在dag中我看到一个排序操作(如下图)。这个排序操作是不是要...
我试图连接我的结构化流火花2.4.5与kafka,但所有的时间,我试图这个数据源提供者错误的出现,按照我的scala代码和我的sbt构建:和错误是:和我的sbt.build是:谢谢你!。按照我的scala代码和我的sbt构建:导入org......
我正在运行一个spark结构化流作业,其中包括创建一个空的数据框架,使用每个微批更新它,如下所示。随着每一个微批处理的执行,阶段数增加......。
什么是需要修复的数据。如何决定spark中Reparation的大小。修复的概念是否适用于spark流和结构化流。DF.Repartion(num)
我试图运行Python Spark结构化流+Kafka,当我运行Master@MacBook-Pro命令 spark-3.0.0-preview2-bin-hadoop2.7 % binspark-submit --packages org.apache.spark:spark-sql-...