spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

Spark消费者使用docker运行时找不到kafka主题分区

当我提交连接到 kafka 代理的 Spark 应用程序时,它会执行 kafka 查询,但不会将任何内容返回到控制台。找不到主题分区。 这是我的日志

回答 1 投票 0

使用PySpark结构化流,如何通过WebSocket将处理后的数据发送到客户端

我在应用程序中使用 PySpark 结构化流,其中使用 readStream 从 Apache Iceberg 表中读取附加数据。在 PySpark 框架内处理数据后,我...

回答 1 投票 0

(为什么)Spark Structured Streaming 会重新编译每个小批量的代码

我有一个 Spark 结构化流作业,从 Kafka 读取数据,解析 avro,分解列,计算一些额外的列作为现有列的简单组合(总和/乘积/除法),然后写...

回答 1 投票 0

DataBricks 自动加载器与输入源文件删除检测

在连续从源 s3 存储桶中提取文件时,我希望能够检测到文件被删除的情况。据我所知,自动加载器无法处理检测...

回答 1 投票 0

Spark 结构化流 - 检查点元数据无限增长

我使用spark结构流3.1.2。我需要使用 s3 来存储检查点元数据(我知道,这不是检查点元数据的最佳存储)。压缩间隔是10(默认),我设置了spar...

回答 2 投票 0

如何从特定事件中心分区 Spark-Streaming 结构读取

我有一个具有 32 个分区的事件中心。 我需要使用 Pyspark 从事件中心读取分区 1 。 这是我现有的代码 # 配置 连接字符串 = "端点=sb://abcd" 事件中心名称...

回答 1 投票 0

登录 Spark 结构化流

我能够开发一个管道,从 kafka 读取数据并进行一些转换,并将输出写入 kafka 接收器以及 parque 接收器。我想添加有效的日志记录来记录

回答 1 投票 0

从 kafka 读取的 Spark 结构化流作业未显示在 kafka 消费者组中

我使用 pyspark 创建了一个 Spark 流作业,它使用 readStream 从 kafka 主题读取数据,并使用 writeStream 写入 Oracle 数据库中的表。 作业可以成功读取...

回答 1 投票 0

Python/PySpark - 以编程方式将 json_string 列发送到 REST API

我有一个数据帧,我使用 Spark Structured Streaming .readStream() 进行流式传输: ID json_数据 123 {颜色:“红色”,值:“#f00”} 125 {颜色:“蓝色”,值:“...

回答 1 投票 0

spark 结构化流作业如何处理流 - 静态 DataFrame 连接?

我有一个 Spark 结构化流作业,它从 cassandra 和 deltalake 读取映射表并与流 df 连接。我想了解这里的确切机制。火花会击中这些吗

回答 1 投票 0

如何在 Spark Streaming 作业中查找数据帧的大小

我正在尝试查找每个批次中 Spark 流作业中数据帧的大小。我能够成功地找到批处理作业的大小,但是当涉及到流式传输时,我无法做到......

回答 2 投票 0

Spark 结构化流的分组和排序

我有一个用例,其中有流数据集,例如手机号码、开始时间和通话持续时间。 我需要对手机号码进行分组,并根据开始时间对组进行排序并过滤掉呼叫

回答 1 投票 0

优雅地停止结构化流查询

我正在使用 Spark 2.1 并尝试优雅地停止流查询。 StreamingQuery.stop() 是否是一个优雅的停止,因为我在文档中没有看到有关此方法的任何详细信息: ...

回答 5 投票 0

当一个kafka主题分区中没有数据时,结构化流作业失败

当我尝试通过偏移量的时间戳从 kafka 主题检索数据时,作业失败并出现错误: 原因是:java.lang.AssertionError:断言失败:没有与 topic-par 的请求匹配的偏移量...

回答 2 投票 0

配置Apache Spark的MemoryStream来模拟Kafka流

我被要求研究使用 Apache Spark 的 MemoryStream 在 Java Spring Boot 服务中模拟 Kafka 流。文档/在线社区在这个主题上有点小,所以我...

回答 1 投票 0

includeExistingFiles: false 在 Databricks Autoloader 中不起作用

使用自动加载器从 adls gen2 获取文件。但是,我只想摄取新文件。使用以下配置仍然无法阻止现有文件被摄取。还有人吗...

回答 2 投票 0

DLT - 视图 v 物化视图语法以及如何声明?

我正在使用徽章架构创建 DLT 管道。在 Silver 中,我使用 CDC/SCD1 按日期获取最新的 id,工作正常,但我对 @dlt.view 包装器有疑问。 我现在的

回答 1 投票 0

固定间隔微批次和AvailableNow 触发器

“固定间隔微批次”和“AvailableNow”触发器之间的根本区别是什么? 我发现有关这些内容的文档令人困惑。 根本不同吗...

回答 1 投票 0

带有分组数据的 Spark 结构化流 - 每组一个微批次

如果对流数据帧数据进行分组,是否可以在 Spark 结构化流中以单独的单个微批次处理每个组?像这样的东西: dfs = ... dfs.groupBy(...).writestrea...

回答 1 投票 0

在 Spark 结构化流中对 foreachBatch 操作应用定义的函数时出现 STREAMING_CONNECT_SERIALIZATION_ERROR

我正在使用 Spark 结构化流,但偶然发现了一个问题,但我看不到问题的根本原因和解决方案。 我定义了一个包含函数的 Reader 类

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.