spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

如何在 Spark Streaming 作业中查找数据帧的大小

我正在尝试查找每个批次中 Spark 流作业中数据帧的大小。我能够成功地找到批处理作业的大小,但是当涉及到流式传输时,我无法做到......

回答 2 投票 0

Spark 结构化流的分组和排序

我有一个用例,其中有流数据集,例如手机号码、开始时间和通话持续时间。 我需要对手机号码进行分组,并根据开始时间对组进行排序并过滤掉呼叫

回答 1 投票 0

优雅地停止结构化流查询

我正在使用 Spark 2.1 并尝试优雅地停止流查询。 StreamingQuery.stop() 是否是一个优雅的停止,因为我在文档中没有看到有关此方法的任何详细信息: ...

回答 5 投票 0

当一个kafka主题分区中没有数据时,结构化流作业失败

当我尝试通过偏移量的时间戳从 kafka 主题检索数据时,作业失败并出现错误: 原因是:java.lang.AssertionError:断言失败:没有与 topic-par 的请求匹配的偏移量...

回答 2 投票 0

配置Apache Spark的MemoryStream来模拟Kafka流

我被要求研究使用 Apache Spark 的 MemoryStream 在 Java Spring Boot 服务中模拟 Kafka 流。文档/在线社区在这个主题上有点小,所以我...

回答 1 投票 0

includeExistingFiles: false 在 Databricks Autoloader 中不起作用

使用自动加载器从 adls gen2 获取文件。但是,我只想摄取新文件。使用以下配置仍然无法阻止现有文件被摄取。还有人吗...

回答 2 投票 0

DLT - 视图 v 物化视图语法以及如何声明?

我正在使用徽章架构创建 DLT 管道。在 Silver 中,我使用 CDC/SCD1 按日期获取最新的 id,工作正常,但我对 @dlt.view 包装器有疑问。 我现在的

回答 1 投票 0

固定间隔微批次和AvailableNow 触发器

“固定间隔微批次”和“AvailableNow”触发器之间的根本区别是什么? 我发现有关这些内容的文档令人困惑。 根本不同吗...

回答 1 投票 0

带有分组数据的 Spark 结构化流 - 每组一个微批次

如果对流数据帧数据进行分组,是否可以在 Spark 结构化流中以单独的单个微批次处理每个组?像这样的东西: dfs = ... dfs.groupBy(...).writestrea...

回答 1 投票 0

在 Spark 结构化流中对 foreachBatch 操作应用定义的函数时出现 STREAMING_CONNECT_SERIALIZATION_ERROR

我正在使用 Spark 结构化流,但偶然发现了一个问题,但我看不到问题的根本原因和解决方案。 我定义了一个包含函数的 Reader 类

回答 1 投票 0

有状态 Spark Streaming 的 SST 文件数量无限增长

我们正在 Databricks 上运行一个非常简单的 Apache Spark Streaming 应用程序。它使用来自 Apache Kafka 的消息,基于 1 小时水印进行重复数据删除,并写入输出...

回答 1 投票 0

Kafka 删除(逻辑删除)未更新 Spark 结构化流中的最大聚合

我正在对 Spark 结构化流 (Spark 3.0) 作业中的计算聚合进行原型设计,并将更新发布到 Kafka。我需要计算最大日期和所有时间的最大百分比(否

回答 1 投票 0

使用 TriggerAvailableNow 和 Eventhubs 进行 Spark 结构化流处理

我一直在尝试将数据块中的增量表中的事件写入事件集线器,并且在尝试使其与 Trigger availableNow=True 一起使用时遇到了问题, 我基本上是在收集...

回答 1 投票 0

PySpark UDF - 读写其他数据帧

在 UDF 中,我想将增量表读入数据帧,根据其内容更新应用 UDF 的实际数据帧的行,然后更新增量表。我会用...

回答 1 投票 0

PySpark 结构化流每批 2 个 SQL(长 addBatch 执行)

我有一个 Pyspark 结构化流应用程序(3.3.2),它需要使用微批次从 Kafka 读取输入,执行复杂的逻辑,其中包括连接来自几个数据帧的数据。 该应用程序是

回答 1 投票 0

为什么spark既需要预写日志又需要检查点?

为什么spark既需要预写日志又需要检查点? 为什么我们不能只使用检查点?另外使用预写日志有什么好处? 存储的数据有什么区别...

回答 1 投票 0

在 pyspark 流中保存数据帧

我希望在流中存储 PySpark DataFrame,针对每个批次对其进行更改,然后使用 foreachBatch 再次保存更新的 DataFrame。实现这一目标的最简单方法是什么。 我...

回答 1 投票 0

如何测试 Databricks 结构化流中的端到端延迟?

我是结构化流媒体新手,并尝试在结构化流媒体中读取数据进行性能测试 我想测试不同的场景,例如,不同的集群大小、不同的数量......

回答 1 投票 0

Apache Spark 结构化流中检查点和预写日志的说明

Apache Spark 结构化流文档中的“检查点”和“预写日志”是什么意思? 我怎样才能更好地理解这些概念? 有什么先决条件吗...

回答 1 投票 0

无法在google colab中使用pyspark从Kafka读取流数据

我正在 google colab 上运行 pyspark。我已经设置了 Kafka 并在主题中添加了一个 csv 文件。如果我不使用结构化流从 kafka 读取数据,我就可以读取数据并打印它。 然而,...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.