我遇到的数据是这样的:
req_id dsp_price_style
0 "1000:10,1001:100,1002:5,1003:7"
1 "1002:5,1000:100,1001:15,1003:6"
字段'dsp_price_style'的值格式为dsp_id_0:price_0,dsp_id_1:price_1,.....,dsp_id_n:price_n,虽然它们不在dsp_id之间排序,但我只需要dsp_id为'1000'和dsp_id的数据是'1001'及其价格,然后使用pyspark将这4个数据添加为新列。
req_id dsp_0价格_0 dsp_1价格_1
0 "1000" "10" "1001" "100"
1 "1000" "100" "1001" "15"
如何在pyspark中以最佳性能实现此功能?
我在这里有一种方法。
data = [
[0,"1000:10,1001:100,1002:5,1003:7"],
[1,"1002:5,1000:100,1001:15,1003:6"]
]
data_sdf = spark.createDataFrame(data, ['req_id', 'dsp_price_style'])
# the function to split
def split_dsp_price(row):
ls_x = row.dsp_price_style.split(sep=',')
return [row.req_id] + reduce(lambda x, y: x + y, [k.split(sep=':') for k in ls_x if k[:4] in ['1000', '1001']])
fnl_data_rdd = data_sdf.rdd.map(lambda r: split_dsp_price(r))
fnl_data_rdd.take(2)
# [[0, '1000', '10', '1001', '100'], [1, '1000', '100', '1001', '15']]
可以进一步转换为DataFrame
或进一步处理。