rdd 相关问题

弹性分布式数据集(RDD)是一种分布式内存抽象,允许程序员在大型集群上执行内存计算,同时保留MapReduce等数据流模型的容错能力。

如何在 PySpark 中从数组中提取元素

我有一个具有以下类型的数据框: 列1|列2|列3|列4 xxxx|yyyy|zzzz|[1111],[2222] 我希望我的输出具有以下类型: 列1|列2|列3|列4|列5 xxxx|yyyy|zzzz|1111|2222 我的 col4 我...

回答 2 投票 0

如何使用spark RDD操作获得防御力最大的所有神奇宝贝?

我尝试使用spark RDD操作找到所有具有最高防御值的神奇宝贝,但我只找到了具有最高防御值的3个神奇宝贝中的一个。有什么办法可以得到...

回答 3 投票 0

在将 pyspark 数据帧中的字符串数据转换为字典时,由于阶段失败而中止作业

我在 pyspark 数据框中有以下数据,其中两列都包含字符串数据。 数据 = [(123, '[{"FLD_NAME":"A","FLD_VAL":"0.1"},{"FLD_NAME...

回答 1 投票 0

使用索引列查询数组列

所以我有两栏 | col_arr | col_ind | |[1,2,3]| [0, 2] | |[5, 1] | [1] | 我希望我的结果是通过 col_ind 提取 col_arr 中的值,从而得到下面的 col_val : |

回答 1 投票 0

Spark JavaRDD 程序读取 csv 和过滤器

如何使用映射和过滤功能使用 RDD 读取 csv 文件,并使用 csv 文件根据特定列进行选择?这是一个 csv 文件示例。 供应商 ID、tpep_pickup_datetime、tpep_dropoff_dat...

回答 2 投票 0

使用元组中的随机密钥创建 Pyspark RDD

我正在研究 Apache Spark,发现了一些有趣的事情。当我使用键值对创建新的 rdd 时,其中键是从元组中随机选择的 - reducebykey 的结果不正确。 来自

回答 1 投票 0

Spark 中的宽变换和窄变换

我有一个问题:Python Spark 的宽变换和窄变换在 RDD 和结构化 API 中都可以找到,对吗? 我的意思是,我想我明白了广义变换和狭义变换之间的区别。我的

回答 1 投票 0

停止 Spark 数据帧分发到集群 - 它需要保留在驱动程序上

我们有一个在 Spark 集群工作线程上进行计算的工作负载(CPU 密集型)。 结果被拉回驱动程序,该驱动程序拥有大量内存分配来通过 RDD .collect() 收集结果 重新...

回答 1 投票 0

PicklingError:无法序列化对象:IndexError:元组索引超出范围

我在cmd中启动了pyspark并执行了下面的操作来磨练我的技能。 C:\Users\Administrator>SUCCESS:PID 5328 的进程(PID 4476 的子进程)已终止。 成功:过程

回答 3 投票 0

在 Spark RDD 中为另一个 RDD 中的每条记录查找最接近的记录

我有两个巨大的 RDD,对于其中一个记录中的每条记录,我需要找到另一个具有相同键的物理上最接近的(纬度/经度)点。但是...在每个 RDD 中,都有数百个或数百万个 r...

回答 1 投票 0

PySpark 拆分为 (Key, Array[Values]) 最佳实践

我对 Spark 相当陌生,我有兴趣从一开始就了解什么是最佳实践,但到目前为止找不到类似的查询。我得到了一些(很多)数据,其中包含以下行...

回答 1 投票 0

如何将Spark RDD中的数据放入Mysql表中

我必须将数据从Spark RDD移动到Mysql表。有人可以帮我解决这个问题吗?

回答 1 投票 0

pyspark - 连接两个 RDD - 缺少第三列

我是 Pyspark 的新手,请考虑:) 基本上我有这两个文本文件: 文件1: 1,9,5 2,7,4 3,8,3 文件2: 1,克,小时 2,1,j 3,k,i 以及Python代码: file1 = sc.textFile("/

回答 1 投票 0

Spark RDD 分区程序在 RDD 中找不到分区

学习自定义Spark RDD Partitioner,编写了一些逻辑,但不编译。 在 Spark 2.4.3 中,启动 Spark shell : 案例类交易(名称:字符串,金额:双倍,国家/地区:字符串) val交易...

回答 1 投票 0

pySpark 将列表或 RDD 元素转换为值(int)

我正在使用 pySpark 来计算标记化 RDD 中的元素数量。 这是要素之一: ('b00004tkvy', ['诺亚', '方舟', '活动', '中心', '宝石', '案例', '年龄', '3', '8', '胜利', '多媒体'. ..

回答 1 投票 0

将 Spark Streaming 状态保存到外部数据库

如果 Spark 应用程序代码有任何更改,Spark Streaming 检查点将无法工作...所以我想将状态信息显式保存到像 cassandra 这样的外部数据库中。 如何冲洗火花

回答 2 投票 0

Spark:减去两个 DataFrames

在 Spark 1.2.0 版本中,可以使用带有 2 个 SchemRDD 的减法来最终得到与第一个不同的内容 val onlyNewData = 今天SchemaRDD.subtract(yesterdaySchemaRDD) 仅新数据

回答 7 投票 0

Pyspark RDDReducebyKey()

`嗨,我想了解我的代码中的问题出在哪里。 #代码 rdd_avg=sc.parallelize([("a",10),("b",15),("c",20),("a",8)]) rdd_sp1=rdd_avg.map(lambd...

回答 1 投票 0

如何将RDD元素类型从list改为Int?

目前,我的 RDD 看起来像 [[2475], [1900], [2300]] 我希望我的 RDD 是这样的 [2475, 1900, 2300]

回答 2 投票 0

spark Scala 中基于自定义文件大小的分区器

我有一个要求,我有一个 s3 文件路径列表及其文件大小 Seq[(String, Int)],所以我创建了一个相同的 RDD val rdd: RDD[(String, Int)] = driverContext.sc.parallelize(

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.