spark-streaming 相关问题

Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。

pyspark .writeStream.format("memory") 未检测到现有数据

我正在测试 pyspark 流,仅使用从文件夹读取的 csv 文件。 当我使用 writeStream.format("console") 时,查询会立即检测到我的 csv 文件并将其输出到

回答 1 投票 0

Delta 实时表使用哪种基于时间的分区策略

我正在使用 Spark-streaming 每 5 分钟摄取一次实时事件流并附加到增量实时表中。该表可以由下游数据管道摄取和处理,也可以直接...

回答 1 投票 0

从巨大的数据框中删除重复的列

所以假设我有巨大的数据帧,并且我使用 pyspark 中的“左”连接来加入它们。现在加入他们后,我发现一些列名称由于相同的列被重复而被重复...

回答 1 投票 0

Databricks Autoloader 写入流

我在azure datalake中加载了多个表(每个表的csv文件),并且想使用自动加载器加载Databricks Delta表中的每个表。 我有一个 python 代码,我使用 for 循环来

回答 1 投票 0

如何在Pyspark中不截断地输出结构化流?

我正在尝试将结构化流结果输出到控制台: .writeStream \ .outputMode("追加") \ .format(“控制台”) \ 。开始() 输出表如下所示: +-----------------...

回答 1 投票 0

将 Spark Streaming 状态保存到外部数据库

如果 Spark 应用程序代码有任何更改,Spark Streaming 检查点将无法工作...所以我想将状态信息显式保存到像 cassandra 这样的外部数据库中。 如何冲洗火花

回答 2 投票 0

使用 Abris 从 Kafka 读取 Spark 流,最新的架构注册表不会同步

我有一个 Spark 流,它从 kafka avro 消息中读取并根据最新版本的架构生成数据帧。我正在使用 abris 来做到这一点,它看起来像这样, 导入 za.co.absa.abris.

回答 1 投票 0

使用 Apache Spark 进行结构化流处理

我可以在结构化流中创建多个批次,而其中只有一个包含连续数据的文件吗? 我已经使用多个输入文件创建了多个批次。但是,现在我想生成多个

回答 1 投票 0

Spark 连续结构化流不显示输入速率或处理速率指标

我正在独立集群上运行我的 Spark 连续结构化流应用程序。但是我注意到平均输入/秒或平均进程/秒等指标没有在结构上显示(作为 NaN)...

回答 1 投票 0

Spark Streaming 不执行 foreach 中的代码行

关于 Spark Streaming 的快速问题。 我正在将 KafkaUtils 中的 createDirectStream 初始化为流,并将其保存为 Spark-streaming 中的 InputDStream,如下所示。 val 流:InputDStream[ConsumerRecord[

回答 1 投票 0

Databricks Delta Live 表只是在 CDC 和 SCD 之后覆盖吗?

您好 Databricks 社区, 目前我面临以下问题,我正在尝试为此找到一个好的解决方案。我使用 DLT 开发具有多跳架构的管道。 用于摄取

回答 1 投票 0

从 Spark 数据集中检索字符串类型列作为字符串变量,以将其作为 Redis 缓存的“键”传递

我正在尝试使用 Spark Streaming 从 kafka 主题读取数据。 来自 kafka 的消息是一个 JSON,我将其作为字符串存储在下面的数据集的值列中。 示例消息:只是一个

回答 1 投票 0

Databricks Notebook 中推断架构失败

我在Databricks中编写了一个spark结构化流。第一段代码是检查我的实体是否存在增量表。如果没有,则创建增量表。在这里,我想我们...

回答 1 投票 0

流媒体应用程序中的旁线模式

我有一个 Spark 流应用程序。它需要一批记录并对记录执行多个映射函数。 当少数记录在 .map 阶段失败时,我希望能够知道原始 id/re...

回答 1 投票 0

Spark 流到 Azure cosmos DB

我正在尝试使用 Spark Stream 将聚合数据流式传输到 Azure Cosmos DB。示例 Spark Stream 应用程序从 n/w 控制台获取输入,然后对其应用聚合并尝试编写...

回答 1 投票 0

即使将“orc.force.positional.evolution”设置为 false hive 仍然会根据位置进行拾取

我有一个外部表,我在其中添加了一些新列,并希望确保orc格式文件中的数据应根据列名称从Spark数据帧写入Hive外部表...

回答 1 投票 0

如何将 Kafka 中的字节转换为其原始对象?

我从 Kafka 获取数据,然后使用默认解码器反序列化 Array[Byte],之后我的 RDD 元素看起来像 (null,[B@406fa9b2), (null,[B@21a9fe0) 但我想要我的原始数据...

回答 2 投票 0

在 Spark 结构化流中使用流数据帧更新静态数据帧

我有一个类似的用例,如如何在 Spark 结构化流中使用流数据帧更新静态数据帧。您可以从上述帖子中获取相同的数据作为示例。 静态_df = ...

回答 1 投票 0

群组成员支持的协议与现有成员不兼容

我面临与 Kafka 相关的问题。 我当前的服务(生产者)将消息发送到 Kafka 主题(事件)。该服务使用kafka_2.12 v1.0.0,用Java编写。 我正在尝试...

回答 4 投票 0

编写 Spark 提交命令

我是 Spark 新手。 我有一个具有以下配置的集群: 节点数量:10 每个节点的核心数量:16 每个节点的内存 (RAM):64GB 这是我的火花提交命令: 火花提交--master ...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.