pyspark 相关问题

Spark Python API(PySpark)将apache-spark编程模型暴露给Python。

如何使用 AWS Glue 运行任意/DDL SQL 语句或存储过程

是否可以从 AWS Glue python 作业执行任意 SQL 命令,例如 ALTER TABLE?我知道我可以用它从表中读取数据,但是有没有办法执行其他数据库特定的命令...

回答 8 投票 0

pyspark UDF 引发“无名为模块”错误

我有一个带有英文国家描述符 ds_pais 的数据框。我想使用 GoogleTranslator 通过 .withColumn 添加一列,将该国家/地区描述符从英语翻译为西班牙语。 来自

回答 1 投票 0

Pyspark -- 过滤包含空值的 ArrayType 行

我是 PySpark 的初学者。假设我有一个像这样的 Spark 数据框: test_df = Spark.createDataFrame(pd.DataFrame({"a":[[1,2,3], [无,2,3], [无,无,无]]})) 现在我希望过滤...

回答 4 投票 0

Pyspark 在列为空时删除重复项

嗨我有一个这样的数据集: ID 姓名 1 A 2 无效的 2 乙 3 C 3 无效的 4 无效的 如果 ID 重复,我想保留名称不为空的唯一值 在这个例子中我想得到这个表...

回答 1 投票 0

PythonException:Pypdf 中的“KeyError:'/Root”

每当我尝试通过代码读取多个 pdf 时,我都会收到此错误 PythonException: 'KeyError: '/Root'' 。我的数据框包含 pdf 详细信息列表,包括内容和元数据。如果 df

回答 1 投票 0

根据 PySpark 中的重复列行创建两个数组

我正在使用 PySpark 并有一个数据帧,它有两列 a 和 b,数据帧中的每列/行只有一个值。 b 中可能(但并不总是)存在重复值...

回答 2 投票 0

优化Databricks中Excel文件的读取和格式化功能

我编写了一个小函数来从 CSV 文件读取数据并将输出存储在格式化的 Excel 工作簿中。该代码将在 Spark 群集上运行的 Azure Databricks 笔记本中运行。我该怎么办...

回答 1 投票 0

如何使用Pyspark将Json中的None值转换为null?

目前我正在解析我的 Json feed: rdd = self.spark.sparkContext.parallelize([(json_feed)]) df = self.spark.read.json(rdd) 只要值都存在就可以了,但是如果我有......

回答 2 投票 0

如何在Pyspark中读取Avro文件?

如何在 Jupyter Notebook 上读取 Pyspark 中的 Avro 文件?! 从 Spark 2.4 开始,Avro 是内置但外部的数据源模块。请按照“Apach...

回答 1 投票 0

如何读取Pyspark Jupiter笔记本中的Avro文件?

如何在 Jupyter Notebook 上读取 Pyspark 中的 Avro 文件?! 从 Spark 2.4 开始,Avro 是内置但外部的数据源模块。请按照“Apach...

回答 1 投票 0

使用本地文件进行 Spark 流式处理(Python)

有没有办法扫描本地文件系统以查找特定文件夹中的更改,就像使用 HDFS (GitHub 示例)一样?使用常规路径或带有 hdfs:// 的 URI 运行它似乎可以工作,但是使用...

回答 1 投票 0

如何在 AWS Glue 工作线程中记录消息(在地图函数内)?

我能够按照 https://docs.aws.amazon.com/glue/latest/dg/monitor-continuous-logging-enable.html 中的说明进行操作,并在驱动程序中记录消息。但是当我尝试在地图内使用记录器时

回答 3 投票 0

使用 Palantir Foundry Code Repository 从具有列 RID 的数据集中导入 RID

我是代码存储库的首次发布者和用户,所以请原谅我的措辞。我正在尝试从充满 RID 的数据集中读取 RID。例如,“backing_dataset_rid”列包含...

回答 1 投票 0

AWS EMR PySpark UDF 失败并显示“无法运行命令 /usr/bin/virtualenv (...)”

我有一个 emr 版本为 6.10.0 的 emr 集群,我尝试在代码中使用 pyspark udf,但它始终失败并出现相同的错误。 数据 = [("AAA",), ("BBB",)...

回答 2 投票 0

Python 默认字典似乎给出了重复的键 - 发生了什么?

下面我的代码返回数据作为Python中的默认字典,输出如下: defaultdict(, {'[0, 13, 26, 39]': ['1']}) 默认字典( 下面的代码将数据作为 Python 中的默认字典返回,输出如下: defaultdict(<type 'list'>, {'[0, 13, 26, 39]': ['1']}) defaultdict(<type 'list'>, {'[0, 13, 26, 39]': ['1']}) defaultdict(<type 'list'>, {'[6, 19, 32, 45]': ['1']}) defaultdict(<type 'list'>, {'[3, 16, 29, 42]': ['1']}) 如何在上面的输出中获得重复的键? 不应该是这样吗: defaultdict(<type 'list'>, {'[0, 13, 26, 39]': ['1', '1']}) defaultdict(<type 'list'>, {'[6, 19, 32, 45]': ['1']}) defaultdict(<type 'list'>, {'[3, 16, 29, 42]': ['1']}) 我正在运行的代码是 def make_bands(value): d2 = defaultdict(list) for key, val in value.iteritems(): d2[(str(list(val[0:4])))].append("1") print d2 value是另一本字典 调用函数make_bands来处理Spark RDD,如下所示: signatureBands = signatureTable.map(lambda x: make_bands(x)).collect() 首先,不,你不能期望输出是你想要的。 d2 在调用之间不保留。每次进入该函数时都会重新创建它。如果你使用一个类来保存状态、一个生成器(这在这里不太优雅)或者一个构造函数而不是 lambda 的函数(这将是我在这里的选择),你仍然可以获得你想要的东西: def build_make_bands(): d2 = defaultdict(list) def make_bands(value): for key, val in value.iteritems(): d2[(str(list(val[0:4])))].append("1") print d2 return make_bands 然后你可以这样称呼它: signatureTable.map(build_make_bands()).collect()

回答 1 投票 0

spark 结构化流作业如何处理流 - 静态 DataFrame 连接?

我有一个 Spark 结构化流作业,它从 cassandra 和 deltalake 读取映射表并与流 df 连接。我想了解这里的确切机制。火花会击中这些吗

回答 1 投票 0

Pyspark中是否可以专门处理Hudi异常

我正在从 s3 读取 Hudi 表,有时存储桶或前缀可能为空,并抛出 org.apache.hudi.exception.TableNotFoundException 。有没有办法让我导入和处理这些 sp...

回答 1 投票 0

获取 pyspark 损坏记录原因

我正在使用 Spark 读取包含一些损坏记录的 json 文件。因此,我使用选项模式 PERMISSIVE 和选项 columnNameOfCorruptRecord 来获取所有损坏的记录。一切正常,h...

回答 1 投票 0

如何修改pyspark dataframe嵌套结构列

我正在尝试对嵌套列进行匿名/哈希处理,但尚未成功。该架构看起来像这样: -- abc: 结构(可空 = true) | |-- xyz:结构(可空 = true) | | |--

回答 2 投票 0

如何使用 pyspark 更新结构体嵌套列中的值

我尝试做非常简单的事情 - 更新嵌套列的值;但是,我不知道如何 环境: 阿帕奇火花2.4.5 数据块 6.4 Python 3.7 数据DF = [ (('乔恩','','史密斯'),'1580-01-06...

回答 3 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.