Pyspark - 转换字符串数组以进行映射,然后映射到可能使用 pyspark 而不是 UDF 或其他性能密集型转换的列

问题描述 投票:0回答:1

我正在处理一些具有一些键值标头和有效负载的数据。我已成功将标头解析为以下数组:

+------------------------------------------------------------+--------------+ 
|col(header)                                                 | col(payload) |
+------------------------------------------------------------+--------------+
[someheader, key1, value1, key2, value2, key3, value3]       | payload      |

someheader 将被删除,剩余的需要转换为地图,如下所示:

{
    key1: value1,
    key2: value2,
    key3: value3,
 }

然后我将这些键值转换为列,如下所示(架构是已知的,尽管知道如何使用动态架构来做到这一点会很酷):

+-------------+--------+--------+-----------+
|key1         |key2    | key3   |payload    |
+-------------+--------+--------+-----------+
|value1       |value2  | value3 |payload    |

我们如何在 pyspark 中进行数组到映射的转换,而无需 python UDF 或任何性能次优的类似结构?另外,不想涉及熊猫。

这是我到目前为止所拥有的 - 它有效,但阅读 Spark 文档,已知 UDF 归因于较低的性能(以及某种反模式):

# imports
from pyspark.sql import *
from pyspark.sql.types import MapType, StringType
from pyspark.sql.functions import col, split, expr, udf, regexp_replace, lit

# udf function to convert arr to map
def to_map(arr):
    map_value = dict()
    for i in range(1,len(arr),2):
        key = arr[i]
        value = arr[i+1]
        map_value[key] = value
    return map_value

# spark session with AWS creds
spark = SparkSession\
            .builder\
            .appName("test-app")\
            .config("spark.hadoop.fs.s3a.access.key", "<aws access key>") \
            .config("spark.hadoop.fs.s3a.secret.key", "<aws secret key>") \
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
            .getOrCreate()

# read the file
df = spark.read.text('s3a://commoncrawl/crawl-data/CC-MAIN-2023-50/segments/1700679099281.67/wet/CC-MAIN-20231128083443-20231128113443-00003.warc.wet.gz', lineSep="\n\r\n\r\n")

# trim whitespaces, not just spaces
df = df.select(regexp_replace(df.value, r"^\s+|\s+$", "").alias("value"))

# split string into header and payload columns
df = df.select(split(df.value, '\r\n\r\n').alias('s')) \
    .withColumn('header', expr('s[0]')) \
    .withColumn('payload', expr('s[1]')) \
    .select(col('header'), col('payload'))

# split the header into key value pair list 
# example: WARC/1.0\r\nWARC-Type: warcinfo\r\nWARC-Date: 2023-12-12T01:40:15Z\r\nWARC-Filename: CC-MAIN-20231128083443-20231128113443-00003.warc.wet.gz\r\nWARC-Record-ID: <urn:uuid:6082596d-524a-4e49-b1dd-86582dc01a2f>\r\nContent-Type: application/warc-fields\r\nContent-Length: 382
# becomes: [WARC/1.0, WARC-Type, warcinfo, WARC-Date, 2023-12-12T01:40:15Z, WARC-Filename, CC-MAIN-20231128083443-20231128113443-00003.warc.wet.gz, WARC-Record-ID, <urn:uuid:6082596d-524a-4e49-b1dd-86582dc01a2f>, Content-Type, application/warc-fields, Content-Length, 382]
df = df.select(split(df.header, ': |\r\n').alias('h'), col('payload'))

# create a UDF for the to_map function defined above
array_to_map = udf(to_map, MapType(StringType(),StringType()))

# convert array to map, and then map to columns using the known schema (how would one do it without a known schema?)
df = df.withColumn('header_map',array_to_map(df.h)) \
    .select(expr("header_map['WARC-Filename']").alias("warcFileName"), 
            expr("header_map['Content-Length']").cast("integer").alias("contentLength"),
            expr("to_timestamp(header_map['WARC-Date'])").alias("warcDate"),
            expr("header_map['WARC-Record-ID']").alias("warcRecordId"),
            expr("header_map['Content-Type']").alias("contentType"),
            expr("header_map['WARC-Type']").alias("warcType"),
            expr("header_map['WARC-Refers-To']").alias("warcRefersTo"),
            expr("header_map['WARC-Identified-Content-Language']").alias("language"),
            expr("header_map['WARC-Block-Digest']").alias("blockDigest"),
            expr("header_map['WARC-Target-URI']").alias("url"),
            expr("header_map['WARC-Target-URI']").alias("docId"),
            expr("header_map['WARC-Target-URI']").alias("partitionKey"),
            col('payload').alias("docText")) \
    .where(col('warcType') == lit('conversion')) \
    .drop('warcFileName')

df.show(n=5, truncate=False)
df.printSchema()
+-------------+-------------------+-----------------------------------------------+-----------+----------+-----------------------------------------------+--------+-------------------------------------+---------------------------------------------------------------------+---------------------------------------------------------------------+---------------------------------------------------------------------+-----------------+
|contentLength|warcDate           |warcRecordId                                   |contentType|warcType  |warcRefersTo                                   |language|blockDigest                          |url                                                                  |docId                                                                |partitionKey                                                         |docText          |
+-------------+-------------------+-----------------------------------------------+-----------+----------+-----------------------------------------------+--------+-------------------------------------+---------------------------------------------------------------------+---------------------------------------------------------------------+---------------------------------------------------------------------+-----------------+
|27923        |2023-11-28 09:43:44|<urn:uuid:783397a2-6c03-41ac-ab5b-bd57685a0ed2>|text/plain |conversion|<urn:uuid:20764b8a-6ab0-4c23-ba54-40854a70b3af>|zho     |sha1:4XCBUNJVBXKQ63MKM2KTMGUPGC74FTSD|http://027sunflower.com/list/%E5%90%88%E5%AE%B6%E4%B8%AD%E5%AE%9DA   |http://027sunflower.com/list/%E5%90%88%E5%AE%B6%E4%B8%AD%E5%AE%9DA   |http://027sunflower.com/list/%E5%90%88%E5%AE%B6%E4%B8%AD%E5%AE%9DA   | ... <truncated> |
|23738        |2023-11-28 09:18:49|<urn:uuid:87276745-6924-46c4-bf1a-55b51cd5c9b0>|text/plain |conversion|<urn:uuid:ed65b632-8ba1-4f1a-b57c-f359cbaea9a2>|zho     |sha1:SERSYY5GWIAD2CBRTFHRYJVDFC5QBJRZ|http://0455.hk/index.asp?areaid=53                                   |http://0455.hk/index.asp?areaid=53                                   |http://0455.hk/index.asp?areaid=53                                   | ... <truncated> |
|43827        |2023-11-28 10:30:30|<urn:uuid:306608d1-bb8d-4071-83aa-388a9333791c>|text/plain |conversion|<urn:uuid:7acff67a-fd7c-4747-a269-e542af700ce3>|eng,zho |sha1:RYKE6ANAB674262OQ2VBDINQM3IXQC45|http://0554c.com/index/Article/info.html?cate=5&id=11                |http://0554c.com/index/Article/info.html?cate=5&id=11                |http://0554c.com/index/Article/info.html?cate=5&id=11                | ... <truncated> |
|1137         |2023-11-28 10:58:30|<urn:uuid:0870b29d-fc06-4a44-97ef-6876bc0fbbee>|text/plain |conversion|<urn:uuid:17042b32-0223-4589-8ceb-644a887e2776>|zho,eng |sha1:J7XVE2KML4YTJYN33VEL4SZRJSAGBKRJ|http://0571ztq.com/jiage/class/?31.html                              |http://0571ztq.com/jiage/class/?31.html                              |http://0571ztq.com/jiage/class/?31.html                              | ... <truncated> |
|1921         |2023-11-28 10:41:49|<urn:uuid:f95cd1dd-3bf8-4272-9f3b-10f847d543e6>|text/plain |conversion|<urn:uuid:fb642dce-08c9-4b41-bc93-13907080b95d>|deu,eng |sha1:IWDUDKUJVRYILJZ4UIHGADV2FFB3JCPS|http://06.live-radsport.ch/index.php?src=email&id=225657&type=details|http://06.live-radsport.ch/index.php?src=email&id=225657&type=details|http://06.live-radsport.ch/index.php?src=email&id=225657&type=details| ... <truncated> |
+-------------+-------------------+-----------------------------------------------+-----------+----------+-----------------------------------------------+--------+-------------------------------------+---------------------------------------------------------------------+---------------------------------------------------------------------+---------------------------------------------------------------------+-----------------+
only showing top 5 rows

root
 |-- contentLength: integer (nullable = true)
 |-- warcDate: timestamp (nullable = true)
 |-- warcRecordId: string (nullable = true)
 |-- contentType: string (nullable = true)
 |-- warcType: string (nullable = true)
 |-- warcRefersTo: string (nullable = true)
 |-- language: string (nullable = true)
 |-- blockDigest: string (nullable = true)
 |-- url: string (nullable = true)
 |-- docId: string (nullable = true)
 |-- partitionKey: string (nullable = true)
 |-- docText: string (nullable = true)
python apache-spark pyspark
1个回答
0
投票

您确实可以通过使用

pyspark.sql.functions
中的一些函数来避免使用UDF。

我们首先创建数据框:

from pyspark.sql.session import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame(
    [
        (
            ["someheader", "key1", "value1", "key2", "value2", "key3", "value3"],
            "payload",
        ),
    ],
    ["header", "payload"],
)

然后,我们计算输出:

# Calculating the array column's length to be able to extract the keys and
# values later on
array_length = (
    df.withColumn("array_length", F.size("header"))
    .agg(F.first("array_length"))
    .collect()[0][0]
)

output = (
    df.select(
        F.explode(
            F.map_from_arrays(
                F.array([F.col("header")[i] for i in range(1, array_length, 2)]),
                F.array([F.col("header")[i] for i in range(2, array_length, 2)]),
            )
        ),
        "payload",
    )
    .groupBy("payload")
    .pivot("key")
    .agg(F.first("value"))
)

>>> output.show()
+-------+------+------+------+
|payload|  key1|  key2|  key3|
+-------+------+------+------+
|payload|value1|value2|value3|
+-------+------+------+------+

想法如下:

  • 我们通过在原始数组列中索引来提取键和值(不均匀索引是键,偶数索引是值)
  • 然后我们将这 2 列转换为 1 个地图列
  • 之后,我们将这些值分解为多列

然后您可以简单地进行分组和旋转以获得您想要的数据框。

注意:这可以动态处理任何长度/名称的键,但无法处理原始

header
列在 1 个数据帧中具有不同长度数组的情况

© www.soinside.com 2019 - 2024. All rights reserved.