Apache Spark SQL是Spark上的“SQL和结构化数据处理”工具,Spark是一种快速通用的集群计算系统。
我正在使用 Spark 从 Kafka 读取 json 数据。我可以使用 from_json() 将 json 数据转换为 Spark 结构。调用 from_json() 时,我必须指定 json 的架构。 我想做...
需要根据条件通过迭代列表向 PySpark DF 添加新列。 new_line_id = 数组('a', 'b', 'c') 输入DF(LineID不是该DF中的PK): |线路 ID | | --------| |米...
使用 Spark-SQL 删除 DeltaTable 中的重复项
假设我有一个表“people”,包含三列:姓名(字符串)、年龄(整数)、方向(字符串)。该表包含行完全相同的重复项。为了删除...
在Spark缓存存储级别中,MEMORY_ONLY存储级别以“反序列化格式”将数据存储在内存中。数据通常不是以二进制形式保存在内存中吗?那么反序列化格式是什么呢
作为一些聚合的结果,我想出了以下 Sparkdataframe: ----------+-----------------+-----------------+ |sale_user_id|gross_profit |total_sale_volume| +------------+-----...
如何读取包含 Excel 公式的 Excel 文件以通过 PySpark lib com.crealytics.spark.excel 计算值
我有一个 Excel 文件,例如: 它使用 Excel 公式计算每个值的列 我尝试使用以下方法读取该文件: input_MonthGroup_df = Spark.read.format("com.crealytics.spark.ex...
我有一个 Pyspark 数据框,其中的字符串日期可能是 yyyyMM (例如 200802)或 yyyyMMdd (例如 20080917)。我正在尝试将这些解析为日期。我目前正在考虑的功能是
Spark SQL 不支持 JSONPATH 通配符的任何解决方法
spark.sql("""select get_json_object('{"k":{"value":"abc"}}', '$.*.value') as j""").show() 这会导致 null,而它应该返回 'a...
从spark/scala项目代码中资源文件夹中的sql文件读取查询
我在 IntelliJ 中的文件夹结构如下 src-->主-->资源-->sql-->samplequery.sql 我在文件夹 src--> main-->scala-... 中有 scala 对象文件samplequeryexecute
在 Spark DataFrame python 中将二进制字符串的列转换为 int
所以我有一个数据框,其中有一列,如下所示: +----------+ |some_colum| +----------+ | 10| | 00| | 00| | 10| | 10| | 00| | 10| | 00| | ...
我是 Spark 的新手,最近了解到它会在调用某个操作时执行所有转换。在搜索过程中,我找到了一个简单的代码来测试它,结果并不符合预期。 他...
我在尝试实现在 6 行窗口中连接值(带空格)(如果可能)的目标时遇到了问题。 我目前正在使用 Azure Databricks 和 Pyspark 3.4.1,这是
如何检查数据是否缓存在数据帧中或由于 Pyspark 中的延迟执行而尚未缓存?
我的问题与我在堆栈溢出上找到的其他问题没什么不同。我需要知道数据是否已经检索并存储在数据框中,或者是否尚未发生 我正在做
我是 PySpark 的新手。 我有一个 Spark DataFrame df,其中有一列“device_type”。 我想将“平板电脑”或“电话”中的每个值替换为“电话”,并将“PC”替换为“桌面”。 在...
如何处理 Apache Spark 中不断变化的 parquet 模式
我遇到了一个问题,我将 Parquet 数据作为 S3 中的每日块(以 s3://bucketName/prefix/YYYY/MM/DD/ 的形式),但无法从 AWS EMR Spark 中读取数据不同的日期因为...
如何在 pyspark DataFrame 上下文中调用 aes_encrypt (和其他 Spark SQL 函数)
我需要在 DataFrame 上下文中调用新的 Spark 函数 aes_encrypt。 该函数可以在 SQL 上下文中调用,如下所示: SELECT *, aes_encrypt(col1, key, 'GCM') AS col1_encrypted FROM myTab...
这里是示例数据: ID 开始时间 时间结束 1 2023-12-29 09:00:00 2023-12-31 06:00:00 2 2023-12-28 09:00:00 2023-12-31 13:00:00 我计划获取这个规范时间中每个小时的时间。 对于
将大型 Spark Dataframe 保存为 S3 中的单个 json 文件
我正在尝试将 Spark DataFrame(超过 20G)保存到 Amazon S3 中的单个 json 文件中,我保存数据帧的代码如下: dataframe.repartition(1).save("s3n://mybucket/testfile","js...
PySpark 中的 Union 静态数据帧与 Spark 结构化流数据帧?
有没有其他方法可以在 PySpark 中应用静态数据帧和结构化流数据帧之间的并集?
Spark Aggregators 的 merge 方法中可以重用其中一个缓冲区吗?
Apache Spark Aggregator 类的 merge 方法将两个缓冲区合并为一个。我可以重用其中一个缓冲区(可能修改它)而不是创建一个要返回的新缓冲区吗...