我有一个数据帧,其中包含来自Azure消费Databricks python笔记本的数据。我在这里只显示cols / rows的子集。
[Row(ResourceRate='0.029995920244854', PreTaxCost='0.719902085876484',
ResourceType='Microsoft.Compute/virtualMachines', Tags=None, ),
Row(ResourceRate='1.10999258782982', PreTaxCost='26.6398221079157',
ResourceType='Microsoft.Compute/virtualMachines',
Tags='"{ ""project"": ""70023"", ""service"": ""10043""}"')
]
我需要从“标签”列中提取标签,并将其显示为(表)列。顺便说一句,我不确定从哪里获得这些双引号。可能是源表beeing .csv。但这很可能最终很容易解决。
我使用pyspark。我正在尝试做这样的事情Split Spark Dataframe string column into multiple columns
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import split, posexplode, concat, expr, lit, col, first
df2 = df.withColumn("num", monotonically_increasing_id())
df3 = df2.select(
"num",
split("Tags", ", ").alias("Tags"),
posexplode(split("Tags", ",")).alias("pos", "val")
)
#display(df3)
df4 = df3.drop("val")\
.select(
"num",
concat(lit("Tag"),col("pos").cast("string")).alias("name"),
expr("Tags[pos]").alias("val")
)
# display(df4)
df5 = df4.groupBy("num").pivot("name").agg(first("val"))
display(df5)
这不完全是我想要的。
num Tag0
964
1677 """project"": ""70023"", """service"": ""10024""
2040 """project"": ""70025"", """service"": ""10034""
2214
...
我希望将标签作为cols:
num project service ResourceRate PreTaxCost
964 0.029995920244854 0.719902085876484
677 70023 10024 1.10999258782982 26.6398221079157
2040 70025 10034 0.029995920244854 0.719902085876484
2214 0.029995920244854 0.719902085876484
...
这里是示例代码,试图将标签分成多列:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
def columnList(r):
val = str(r[0].tags)
i = int(val.index("{") + 1)
j = int(val.index("}"))
val = val[i:j]
vals = val.split(",")
collist = []
collist.append('id')
for val in vals:
keyval = val.split(":")
key = keyval[0]
collist.append(key.replace('"',""))
return collist
def valueList(r):
val = r[1]
i = int(val.index("{")+1)
j = int(val.index("}"))
val = val[i:j]
vals = val.split(",")
valList = []
valList.append(r[0])
for val in vals:
keyval = val.split(":")
value = keyval[1]
valList.append(value.replace('"',""))
return valList
sc = SparkSession.builder.appName("example").\
config("spark.driver.memory","1g").\
config("spark.executor.cores",2).\
config("spark.max.cores",4).getOrCreate()
df =
sc.read.format(“ csv”)。option(“ header”,“ true”)。option(“ delimiter”,“ |”)。load(“ columns.csv”)] >>
tagsdf = df.select("id","tags") colList = columnList(tagsdf.rdd.take(1)) tagsdfrdd = tagsdf.rdd.map(lambda r : valueList(r)) dfwithnewcolumns = tagsdfrdd.toDF(colList) newdf = df.drop("tags").join(dfwithnewcolumns,on=["id"]) newdf.show()
样本测试文件id | ResourceRate | PreTaxCost | ResourceType | Tags1 |'1.10999258782982'| '26 .6398221079157'|'Microsoft.Compute / virtualMachines'|'“ {”“ project”“:”“ 70023”“,”“ service”“:”“ 10043”“}”'
如果没有id列,则可能要合并rdds
IIUC,您可以将Tags
转换为一列JSON字符串([trim