将 pyspark 数据帧转换为代码/语法

问题描述 投票:0回答:1

假设我在 Databricks 中有以下 pyspark 数据框:

一些其他_列 价格_历史记录
测试1 [{“日期”:“2021-03-21T01:20:33Z”,“price_tag”:“N”,“价格”:“9.23”,“price_promotion”:“1.34”,“AKT”:假,” my_column":null,"供应商":"some_supplier"}]
测试2 [{“日期”:“2021-03-23T01:20:33Z”,“price_tag”:空,“价格”:“10.40”,“price_promotion”:空,“AKT”:true,“my_column”:“某事”,“供应商”:null}]

可以直接看到“price_history”列中有一些嵌套的列结构。但是我如何获得构建这个表的代码呢?有没有办法将此表转换为实际代码,然后可用于再次创建表?

我知道获取有关表的更多信息的唯一方法是使用

df.schema
但生成的模式并没有告诉我如何手动创建具有复杂数据结构(如结构、映射类型、数组类型等)的数据帧。如果有一种方法可以更轻松地编写代码测试,因为可以轻松创建示例数据。

所以我想要得到的是以下代码:

from datetime import datetime
from decimal import Decimal

from pyspark.sql.types import (StructType, StructField, StringType,
                               ArrayType, TimestampType, BooleanType,
                               DecimalType)

schema = StructType([
    StructField("some_other_column", StringType()),
    StructField('price_history', ArrayType(StructType([
        StructField('date', TimestampType()),
        StructField('price_tag', StringType()),
        StructField('price', DecimalType(12, 2)),
        StructField('price_promotion', DecimalType(12, 2)),
        StructField('AKT', BooleanType()),
        StructField("my_column", StringType()),
        StructField("supplier", StringType()),
    ]))),
])

data = [
    ('test1', [(datetime(2021, 3, 21, 1, 20, 33), 'N',
                Decimal('9.23'), Decimal('1.34'), False, None, 'some_supplier')]),
    ('test2', [(datetime(2021, 3, 23, 1, 20, 33), None,
                Decimal('10.40'), None, True, 'something', None)]),
]
df = spark.createDataFrame(schema=schema, data=data)
python pyspark
1个回答
0
投票

我不知道有什么解决方案可以生成构建表格的代码。也许 ChatGPT 可以帮助解决这个问题,但这是一个从平面列创建结构数组的示例:

import pyspark.sql.functions as f
from pyspark.sql import SparkSession
import datetime

spark = SparkSession.builder.getOrCreate()

initial_df = spark.createDataFrame([
  ('test1', datetime.datetime(2024, 1, 8, 12), 'N'),
  ('test1', datetime.datetime(2024, 1, 8, 13, 30), 'Y'),
  ('test2', datetime.datetime(2024, 1, 8, 14), 'N')
], ['some_other_column', 'date', 'price_tag'])

refined_df = (
  initial_df
  .withColumn('price_detail', f.struct(*[f.col(element) for element in ['date', 'price_tag']]))
  .groupBy('some_other_column')
  .agg(f.collect_list(f.col('price_detail')).alias('price_history'))
)

refined_df.show(truncate = False)
refined_df.printSchema()

输出:

+-----------------+----------------------------------------------------+        
|some_other_column|price_history                                       |
+-----------------+----------------------------------------------------+
|test1            |[{2024-01-08 12:00:00, N}, {2024-01-08 13:30:00, Y}]|
|test2            |[{2024-01-08 14:00:00, N}]                          |
+-----------------+----------------------------------------------------+

root
 |-- some_other_column: string (nullable = true)
 |-- price_history: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- date: timestamp (nullable = true)
 |    |    |-- price_tag: string (nullable = true)
© www.soinside.com 2019 - 2024. All rights reserved.