pyspark 相关问题

Spark Python API(PySpark)将apache-spark编程模型暴露给Python。

使用 Palantir Foundry Code Repository 从具有列 RID 的数据集中导入 RID

我是代码存储库的首次发布者和用户,所以请原谅我的措辞。我正在尝试从充满 RID 的数据集中读取 RID。例如,“backing_dataset_rid”列包含...

回答 1 投票 0

AWS EMR PySpark UDF 失败并显示“无法运行命令 /usr/bin/virtualenv (...)”

我有一个 emr 版本为 6.10.0 的 emr 集群,我尝试在代码中使用 pyspark udf,但它始终失败并出现相同的错误。 数据 = [("AAA",), ("BBB",)...

回答 2 投票 0

Python 默认字典似乎给出了重复的键 - 发生了什么?

下面我的代码返回数据作为Python中的默认字典,输出如下: defaultdict(, {'[0, 13, 26, 39]': ['1']}) 默认字典( 下面的代码将数据作为 Python 中的默认字典返回,输出如下: defaultdict(<type 'list'>, {'[0, 13, 26, 39]': ['1']}) defaultdict(<type 'list'>, {'[0, 13, 26, 39]': ['1']}) defaultdict(<type 'list'>, {'[6, 19, 32, 45]': ['1']}) defaultdict(<type 'list'>, {'[3, 16, 29, 42]': ['1']}) 如何在上面的输出中获得重复的键? 不应该是这样吗: defaultdict(<type 'list'>, {'[0, 13, 26, 39]': ['1', '1']}) defaultdict(<type 'list'>, {'[6, 19, 32, 45]': ['1']}) defaultdict(<type 'list'>, {'[3, 16, 29, 42]': ['1']}) 我正在运行的代码是 def make_bands(value): d2 = defaultdict(list) for key, val in value.iteritems(): d2[(str(list(val[0:4])))].append("1") print d2 value是另一本字典 调用函数make_bands来处理Spark RDD,如下所示: signatureBands = signatureTable.map(lambda x: make_bands(x)).collect() 首先,不,你不能期望输出是你想要的。 d2 在调用之间不保留。每次进入该函数时都会重新创建它。如果你使用一个类来保存状态、一个生成器(这在这里不太优雅)或者一个构造函数而不是 lambda 的函数(这将是我在这里的选择),你仍然可以获得你想要的东西: def build_make_bands(): d2 = defaultdict(list) def make_bands(value): for key, val in value.iteritems(): d2[(str(list(val[0:4])))].append("1") print d2 return make_bands 然后你可以这样称呼它: signatureTable.map(build_make_bands()).collect()

回答 1 投票 0

spark 结构化流作业如何处理流 - 静态 DataFrame 连接?

我有一个 Spark 结构化流作业,它从 cassandra 和 deltalake 读取映射表并与流 df 连接。我想了解这里的确切机制。火花会击中这些吗

回答 1 投票 0

Pyspark中是否可以专门处理Hudi异常

我正在从 s3 读取 Hudi 表,有时存储桶或前缀可能为空,并抛出 org.apache.hudi.exception.TableNotFoundException 。有没有办法让我导入和处理这些 sp...

回答 1 投票 0

获取 pyspark 损坏记录原因

我正在使用 Spark 读取包含一些损坏记录的 json 文件。因此,我使用选项模式 PERMISSIVE 和选项 columnNameOfCorruptRecord 来获取所有损坏的记录。一切正常,h...

回答 1 投票 0

如何修改pyspark dataframe嵌套结构列

我正在尝试对嵌套列进行匿名/哈希处理,但尚未成功。该架构看起来像这样: -- abc: 结构(可空 = true) | |-- xyz:结构(可空 = true) | | |--

回答 2 投票 0

如何使用 pyspark 更新结构体嵌套列中的值

我尝试做非常简单的事情 - 更新嵌套列的值;但是,我不知道如何 环境: 阿帕奇火花2.4.5 数据块 6.4 Python 3.7 数据DF = [ (('乔恩','','史密斯'),'1580-01-06...

回答 3 投票 0

为什么我无法让 PySpark 在“leftouter”与本身就是联接结果的 Dataframe 联接之后删除右侧的重复列?

我有以下输入数据框: 预期 = Spark.createDataFrame( # fmt:关闭 数据=[ {“id”:“1”,“组”:“1”,“开始”:1_000...

回答 1 投票 0

Pyspark:如何使用不同的列连接两个具有不同条件的不同数据集?

我将把这两个数据集在不同列的不同条件下连接起来以获得Pyspark中的一个数据集 第一个数据集 df1: RC1 RC2 RC3 响应 AB2 AB1 AB6 吉恩 AB4 AB3 AB7 谢因 AB9 AB5...

回答 2 投票 0

无法使用 Spark dataframe 和 scala 创建 CSV,而是创建文件夹名称中包含“.csv”的文件夹

我无法使用 Spark 数据框编写或创建 csv。相反,它为我创建目录。这是我的代码 com.package.dssupplier 包 导入 org.apache.spark.sql.{SaveMode、SparkSessi...

回答 1 投票 0

MEMORY_AND_DISK_DESER 如何用于 PySpark DataFrame?

有人可以解释 PySpark DataFrame 缓存/持久性的默认 MEMORY_AND_DISK_DESER 存储级别的行为吗? (似乎 DataFrame 的默认存储级别曾经来自

回答 1 投票 0

问题无法找到 s3ablock-0001-

当我尝试在 S3 上写入数据时,我在 Amazon EMR 上运行作业时遇到问题。 这是堆栈跟踪: org.apache.hadoop.util.DiskChecker$DiskErrorException:找不到任何有效的本地可怕...

回答 4 投票 0

PySpark withColumn() 函数无法识别层次结构

我有一个任务,将结构类型的嵌套结构转换为内部有结构的数组类型。为此,我遵循使用 withColumn() 函数的方法。官方文档说这个

回答 1 投票 0

如何使用 pyspark 检索文本中的列

我有一个像这样的文本文件数据, 姓名|年龄|课程|"A"|20|"科学"|"B"|23|"数学"|"C"|25|"英语" 我需要获取列名称、年龄...

回答 1 投票 0

如何更新spark中嵌套数组内的值

我目前正在尝试更新 pyspark 数据框中的列的值。 这是架构: 根 |-- 日期:字符串 |-- 票证:结构 | |-- 金钱:数组 | | |-- 元素:结构 | ...

回答 1 投票 0

读取多个 CSV 文件,每个 CSV 文件的列数不同

我想使用 PySpark 读取具有不同列数的多个 CSV 文件。 文件=['数据/f1.csv','数据/f2.csv','数据/f3.csv','数据/f4.csv','数据/f5.csv'] f1 文件有 50 列,f2 有 10 ...

回答 3 投票 0

py4j.Py4JException:方法 sql([class java.lang.String, class [Ljava.lang.Object;]) 不存在

我是火花新手。当我尝试通过我的 hadoop 主节点上的 jupyter 笔记本运行 pyspark 时,出现此错误。 使用 阿帕奇火花= 3.4.0 蟒蛇3.11 请检查下面给出的代码 来自

回答 1 投票 0

如何在Databricks中实现像ADF一样的列映射

如您所见,这是 ADF 中复制活动中数据类型的常规列映射,我们可以在其中将数据类型、列名称等从源更改为目标。 如果我想在数据库中进行相同的设置...

回答 1 投票 0

Tensorflow 图形执行权限被拒绝错误

我创建了一个包含 Tensorflow 2.11 的虚拟环境。我使用这个虚拟环境的Python作为spark驱动程序和执行程序Python。 venv zip 已作为存档添加到 P...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.