spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

PySpark 数据框转换 pyspark

我有一个下面的数据框,我需要将其转换如下。 我正在使用 PySpark 3.4.1 。 +------------------------+------------------------ -------------------------------------------------- ---------...

回答 1 投票 0

Spark 结构化流因 Kubernetes 中执行器内存不足而失败

我正在运行一个 Spark 结构化流作业,从对象存储桶中读取数据,进行一些转换和过滤,调用 groupby 和聚合。将写入内容发布到对象存储中我...

回答 1 投票 0

结构化流+Kafka集成numInputRows为0但偏移量增加

结构化流+ Kafka集成,spark版本3.3,我发现了一些奇怪的东西,一开始我可以得到正确的numInputRows,但是大约5次获取numInputRows将是0,bue endOff...

回答 1 投票 0

pyspark 中的用例流

我正在 Azure 上使用 Databricks,我的数据托管在 ADLS2 上。 当前运行时版本是 10.4 LTS(如果需要我可以升级) 我有一个表 Pimproduct: ID 姓名 行动 dlk_last_modified

回答 1 投票 0

Spark 流使用 Abris 从 Kafka 读取,并且最新的架构注册表不会同步

我有一个 Spark 流,它从 Kafka Avro 消息中读取并根据最新版本的架构生成数据帧。我正在使用 abris 来执行此操作,看起来就像这样。 导入 za.co.absa...

回答 1 投票 0

kafka消费者组在spark结构化流媒体中的意义

计划构建 Spark 结构化流应用程序,该应用程序从 Kafka 主题读取 json 数据,解析数据并写入任何存储。 val df = 火花 .readStream .format("卡夫克...

回答 1 投票 0

spark scala kafka avro 反序列化器

我正在使用来自kafka的流数据帧。 value 列中的数据采用 avro 格式。我想将数据反序列化为结构类型。 目前我已经编写了一个 udf 函数来调用该方法

回答 1 投票 0

使用 Apache Spark 进行结构化流处理

我可以在结构化流中创建多个批次,而其中只有一个包含连续数据的文件吗? 我已经使用多个输入文件创建了多个批次。但是,现在我想生成多个

回答 1 投票 0

使用 Spark 进行文件流传输

我正在使用来自 s3 的 Spark 文件流。代码运行良好,但对于第一批,所有文件都被视为目录中的有效文件。请帮助我是火花新手。 我的代码如下。 我正在使用来自 s3 的 Spark 文件流。代码运行良好,但对于第一批,所有文件都被视为目录中的有效文件。请帮助我是火花新手。 我的代码如下。 <pre> Dataset<Row> df = spark.readStream().format("csv").schema(getOLRSchema()).load( "s3a://filePath/*.csv"); query = df.writeStream().foreachBatch(new VoidFunction2<Dataset<Row>, Long>() { @Override public void call(Dataset<Row> ds, Long batchId) throws Exception { long timestamp = System.currentTimeMillis(); ds.write().parquet("hdfs://ha-cluster/testStreaming/"); System.out.println("Time taken to complete: " + (System.currentTimeMillis() - timestamp)); } }) .option("checkpointLocation", "hdfs://ha-cluster/mnt/vol2/Ajit/checkpoint") .option("latestFirst", "true") .option("maxFilesPerTrigger", "32") .trigger(Trigger.ProcessingTime(120000)) .outputMode("append") .start(); query.awaitTermination(); </pre> Spark 文档解释说,只有新注意到的文件才会被处理,现有的文件将被忽略

回答 1 投票 0

Spark 连续结构化流不显示输入速率或处理速率指标

我正在独立集群上运行我的 Spark 连续结构化流应用程序。但是我注意到平均输入/秒或平均进程/秒等指标没有在结构上显示(作为 NaN)...

回答 1 投票 0

无法在Jupyter笔记本上使用kafka jars

我正在使用 Spark 结构化流从单节点 Kafka 读取数据。在 Mac 上本地运行以下设置。我可以通过 Spark-Submit 阅读,但在 Jupyter Notebook 中不起作用。 来自 pyspark.sql 我...

回答 1 投票 0

Databricks Notebook 中推断架构失败

我在Databricks中编写了一个spark结构化流。第一段代码是检查我的实体是否存在增量表。如果没有,则创建增量表。在这里,我想我们...

回答 1 投票 0

在流式传输和更新插入到增量表时保留分区

我目前正在使用 writestream 进行流式传输以写入现有的增量表,以便添加附加属性。 我正在使用 foreachBatch 函数,其中包含用户定义的函数

回答 1 投票 0

如何通过指定开始和结束时间戳,使用 kinesis 或 kafka 中的 databricks 读取历史数据?

可以说我想读取2023年3月8日至2023年3月14日期间到达的数据 有没有一种方法可以定义结束位置以及下面的初始位置。 Spark.readStream.format...

回答 1 投票 0

使用开放表格式的性能不佳

我有一个现有的案例: 其中每天从多个配置单元表读取整个/完整数据, 如 SQL 查询中所述,对其进行处理/转换(连接、聚合、过滤等)。 这些 SQL 查询是

回答 1 投票 0

Spark Kafka 源和 Confluence 监控拦截器

配置 Spark 使用 Kafka 时,如下所示: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#kafka-specific-configurations 火花人声明如下...

回答 1 投票 0

获取具有最新时间戳的数据帧行(Spark 结构化流)

我有这个数据框: +------+--------------------+------------+------------ --------------+------------------------+ |品牌 |原始时间戳 |重量 |到达时间戳 |功能 ...

回答 2 投票 0

获取具有最新时间戳的数据帧行

我有这个数据框: +------+--------------------+------------+------------ --------------+------------------------+ |品牌 |原始时间戳 |重量 |到达时间戳 |功能 ...

回答 1 投票 0

Spark 结构化流中按问题分组

我正在编写一个管道,它从一个名为车辆位置的 Kafka 主题读取数据。 这是上述主题中的记录示例: [/hfp/v2/journey/ongoing/vp/train/0090/01017/30...

回答 0 投票 0

yarn 如何知道如何在具有最多可用核心的节点管理器节点上运行消耗核心的任务?

我们有 10 个与数据节点共同托管的节点管理器节点 节点上可用的 Vcore 描述如下 Vcore使用Vcore Avilble 节点管理器 1 56 6...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.