在将 pyspark 数据帧中的字符串数据转换为字典时,由于阶段失败而中止作业

问题描述 投票:0回答:1

我在 pyspark 数据框中有以下数据,其中两列都包含字符串数据。

data = [(123, '[{"FLD_NAME":"A","FLD_VAL":"0.1"},{"FLD_NAME":"B","FLD_VAL":"0.2"},{"FLD_NAME":"C","FLD_VAL":"0.3"},{"FLD_NAME":"D","FLD_VAL":"0.4"}]')]
ar = spark.createDataFrame(data, ['id', 'val'])
id 瓦尔
123 [{"FLD_NAME":"A","FLD_VAL":"0.1"},{"FLD_NAME":"B","FLD_VAL":"0.2"},{"FLD_NAME":"C","FLD_VAL" :"0.3"},{"FLD_NAME":"D","FLD_VAL":"0.4"}]

现在,我的目标是将字符串数据从 val 列转换为字典数据。例如:

{
 "A": 0.1,
 "B": 0.2,
 "C": 0.3,
 "D": 0.4
}

因此,数据如下所示:

id 瓦尔
123 {“A”:0.1,“B”:0.2,“C”:0.3,“D”:0.4}

注意:我还必须将 FLD_VAL 中的数据转换为十进制。

我尝试过以下代码:

def func(rows) :
  lp= { row['FLD_NAME'] : row['FLD_VAL'] for row in rows }
  return lp
arr = ar\
    .rdd\
    .map(lambda row: (row[0], func(row[1])))\
    .groupByKey()\
    .toDF(["id","val"])

此代码引发以下错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 20.0 failed 4 times, most recent failure: Lost task 15.3 in stage 20.0 (TID 186) (10.5.152.101 executor 0): org.apache.spark.api.python.PythonException: 'TypeError: string indices must be integers', from <
python dataframe pyspark databricks rdd
1个回答
0
投票

这是一种方法。基本上将字符串读取为 json,探索数组,提取单个元素,从单个列表中创建映射。

我确信使用变换可以更好地完成此操作。但这更不言自明。

import sys

from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql import functions as F

from pyspark import SQLContext
import ast

sc = SparkContext('local')
sqlContext = SQLContext(sc)

data = [(123, '[{"FLD_NAME":"A","FLD_VAL":"0.1"},{"FLD_NAME":"B","FLD_VAL":"0.2"},{"FLD_NAME":"C","FLD_VAL":"0.3"},{"FLD_NAME":"D","FLD_VAL":"0.4"}]')]
df1 = sqlContext.createDataFrame(data, ['id', 'val'])



df1.show(n=10, truncate=False)
print("Collect columns into list")

my_schema =  ArrayType( MapType(StringType(), StringType()))


intermediate_df = df1.withColumn("list_dict", F.from_json("val", schema=my_schema)).drop("val")

print("intermediate_df dataframe")
intermediate_df.show(n=20, truncate=False)

intermediate_df =  intermediate_df.withColumn("dict_ele", F.explode(F.col("list_dict"))).drop("list_dict")

intermediate_df =  intermediate_df.withColumn("FLD_NAME", F.col("dict_ele").getField("FLD_NAME")) \
                   .withColumn("FLD_VAL", F.col("dict_ele").getField("FLD_VAL").cast(FloatType())) \
                               .drop("dict_ele")

print("intermediate_df dataframe")
intermediate_df.show(n=20, truncate=False)

intermediate_df  = intermediate_df.groupby("id").agg(F.map_from_arrays(F.collect_list("FLD_NAME"),F.collect_list("FLD_VAL")))

print("intermediate_df dataframe")
intermediate_df.show(n=20, truncate=False)

输出:

+---+-------------------------------------------------------------------------------------------------------------------------------------+
|id |val                                                                                                                                  |
+---+-------------------------------------------------------------------------------------------------------------------------------------+
|123|[{"FLD_NAME":"A","FLD_VAL":"0.1"},{"FLD_NAME":"B","FLD_VAL":"0.2"},{"FLD_NAME":"C","FLD_VAL":"0.3"},{"FLD_NAME":"D","FLD_VAL":"0.4"}]|
+---+-------------------------------------------------------------------------------------------------------------------------------------+

Collect columns into list
intermediate_df dataframe
+---+------------------------------------------------------------------------------------------------------------------------------------+
|id |list_dict                                                                                                                           |
+---+------------------------------------------------------------------------------------------------------------------------------------+
|123|[{FLD_NAME -> A, FLD_VAL -> 0.1}, {FLD_NAME -> B, FLD_VAL -> 0.2}, {FLD_NAME -> C, FLD_VAL -> 0.3}, {FLD_NAME -> D, FLD_VAL -> 0.4}]|
+---+------------------------------------------------------------------------------------------------------------------------------------+

intermediate_df dataframe
+---+--------+-------+
|id |FLD_NAME|FLD_VAL|
+---+--------+-------+
|123|A       |0.1    |
|123|B       |0.2    |
|123|C       |0.3    |
|123|D       |0.4    |
+---+--------+-------+

intermediate_df dataframe
+---+--------------------------------------------------------------+
|id |map_from_arrays(collect_list(FLD_NAME), collect_list(FLD_VAL))|
+---+--------------------------------------------------------------+
|123|{A -> 0.1, B -> 0.2, C -> 0.3, D -> 0.4}                      |
+---+--------------------------------------------------------------+
© www.soinside.com 2019 - 2024. All rights reserved.