rdd 相关问题

弹性分布式数据集(RDD)是一种分布式内存抽象,允许程序员在大型集群上执行内存计算,同时保留MapReduce等数据流模型的容错能力。

PySpark 中分组数据的直方图

我有由日期时间、ID 和速度组成的数据,我希望使用 PySpark 获取每个 ID 的速度直方图数据(起点/终点和计数)。样本数据: df = Spark.createDat...

回答 1 投票 0

ValueError:RDD 为空——Pyspark(Windows 独立版)

我正在尝试创建一个RDD,但spark没有创建它,抛出错误,粘贴在下面; 数据=记录.map(lambda r:LabeledPoint(extract_label(r),extract_features(r))) 第一个点=数据.第一个...

回答 2 投票 0

多次行动会引发失败

我是 Spark 新手。 我在将 df 保存到 Hive 表的部分遇到了一些问题。 def insert_into_hive_table(df: DataFrame, table_name: str): # 用于调试 - 此操作正在运行...

回答 1 投票 0

rdd.zipWithIndex() 在非常大的数据集上抛出 IllegalArgumentException

我正在 Azure Databricks 中运行 python 笔记本。尝试使用 rdd.zipWithIndex() 添加行号时出现 IllegalArgumentException 错误。该文件大小为 2.72 GB,有 1238951 行(我

回答 2 投票 0

如何在 Apache Spark 中对整数列表进行排序?

最近我开始使用 Apache Spark 对大量数据进行排序。 在我的初始测试中,我尝试在 PySpark 上并行对整数列表进行排序,但显然使用

回答 2 投票 0

(Spark 3.3.2 OpenJDK19 PySpark Pandas_UDF Python3.10 Ubuntu22.04 Dockerized)测试脚本产生类型错误:'JavaPackage'对象不可调用

我创建了一个安装 Ubuntu 22.04、Python 3.10、Spark 3.3.2、Hadoop 3、Scala 13 和 Open JDK 19 的 docker 容器。 在 AWS 中部署代码之前,我目前正在使用它作为测试环境。 这

回答 0 投票 0

PySpark:如何根据多个条件附加来自其他 pyspark 数据框的新列?

我有pyspark df1 |编号 |名称 |电子邮件|年龄|大学| |---| ------+ --------------+---+--------| |12 |斯塔 |[email protected] |25 |clg1 | |21 |丹尼 |[email protected] |23 |clg2 | |37 |

回答 0 投票 0

PySpark - 读取检查点数据帧

我目前正在使用 pyspark 为机器学习应用程序执行一些数据清理。 最后一个会话崩溃了,但我设置了一个 checkpointdir 并检查了我的 DataFrame。 现在我有

回答 1 投票 0

使用 sc.textFile() 加载本地文件以激发

问题 如何使用 sc.textFile 从本地文件系统加载文件到 Spark?我需要更改任何 -env 变量吗?此外,当我在未安装 Hadoop 的 Windows 上尝试相同操作时,我...

回答 4 投票 0

使用 BinaryClassificationMetrics 时在 Spark 中使用 map 和 reduce 进行并行计算?

我正在尝试通过使用 map 和 reduce 而不是 for 循环来并行化 Spark 中 AUC 的计算。 但是,因为我有另一个 RDD sc.parallelize(recModelPredictionsAndLabels) 而我不能...

回答 0 投票 0

PySpark - 读取文本文件并忽略行内的换行

我有一个格式如下的文本文件: A,"123 主街 林肯, NE 55555",13343 B,"345 学校街",23432 我想将其作为 2 行而不是 3 行摄取,但是有没有办法点燃...

回答 0 投票 0

在 Scala Spark 中从具有嵌套序列的数据集创建 Dataframe 的问题

我正在尝试从包含嵌套序列的序列创建数据框,但出现 scala.Match 错误。 val data = Seq(("Java", Seq(Seq(true, 5L), 0, 7.5)), ("Python&quo...

回答 1 投票 0

为什么 getNumPartitions() 为同一数据集返回不同的值?

我有一个主要数据集。我需要应用一些过滤和扩充方法,这些方法需要一些 groupby 并在此数据集上加入。 当我在完成程序后运行 df.rdd.getNumPartitions() 时......

回答 0 投票 0

如何在 Spark 中计算单次扫描中的字数和对数

我有一个由单词和数字组成的字符串标记数组,我正在尝试在 Apache Spark 中同时计算单个单词、单词-单词对和数字-单词对的计数。我...

回答 0 投票 0

Java Apache Spark RDD 持久化

我有这样的代码: JavaPairRDD A = JavaPairRDD B; B.persist(StorageLevel.MEMORY_AND_DISK()) 稍后,变量 A 可能会或可能不会重新分配给另一个 JavaPairR 的转换...

回答 0 投票 0

如何在此 Pyspark mapreduce 代码中拆分年份?

我需要为每个单词计算每年有多少篇文章包含它。我一直坚持如何用单词来划分年份,因为我不断得到与日期相连的第一个单词,如图所示

回答 2 投票 0

在同一数据框中合并 RDD 映射结果列的方法

我正在 pyspark 代码中进行一次转换并创建新列。我观察到 source_df 正在被新列取代。是否可以将新列与现有数据框合并

回答 1 投票 0

如何提升Spark性能

我正在处理一个大型数据集,其中包含大小为 (1000,10000) 的向量;我需要从数据集中找出向量 的所有三元组,总方差最多为 τ 我的电流...

回答 0 投票 0

Scala RDD 获取元素出现在列表中的次数 [关闭]

我有一个变量,它是 Ints 和字符串列表的 RDD,比如: val rdd: RDD[(Int, List[String])] 此 RDD 在 Int 部分包含年份,在 List 中包含城市名称列表。 我想知道如何...

回答 0 投票 0

PySpark - 基于另一个 RDD 过滤 RDD - 广播一个 RDD

我有两个 RDD: 内容 & 消除 两者都是逐行包含多个单词的 RDD。我想要的是过滤 remove RDD 中出现的内容中的所有单词。我正在尝试: 过滤器=内容...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.