在结构化流PySpark中动态扩展Arraytype()列

问题描述 投票:0回答:1

我有以下DataFrame:

root
 |-- sents: array (nullable = false)
 |    |-- element: integer (containsNull = true)
 |-- metadata: array (nullable = true)
 |    |-- element: float (containsNull = true)

+----------+---------------------+
|sents     |metadata             |
+----------+---------------------+
|[1, -1, 0]|[0.4991, 0.5378, 0.0]|
|[-1]      |[0.6281]             |
|[-1]      |[0.463]              |
+----------+---------------------+

我想动态地将每个数组项扩展到其自己的列,以便其外观如下:

+--------+--------+--------+-----------+-----------+-----------+
|sents[0]|sents[1]|sents[2]|metadata[0]|metadata[1]|metadata[2]|
+--------+--------+--------+-----------+-----------+-----------+
|       1|      -1|       0|     0.4991|     0.5378|        0.0|
|      -1|    null|    null|     0.6281|       null|       null|
|      -1|    null|    null|      0.463|       null|       null|
+--------+--------+--------+-----------+-----------+-----------+

但是在结构化流传输中,动态处理存在很多限制:

我尝试了以下操作:

numcol = df.withColumn('phrasesNum', F.size('sents')).agg(F.max('phrasesNum')).head()
df = df.select(*[F.col('sents')[i] for i in range(numcol[0])],*[F.col('metadata')[i] for i in range(numcol[0])])

也:

df_sizes = df.select(F.size('sents').alias('sents'))
df_max = df_sizes.agg(F.max('sents'))
nb_columns = df_max.collect()[0][0]

d = c.select(*[F.map_values(c['metadata'][i]).getItem(0).alias('confidenceIntervals'+"{}".format(j)).cast(DoubleType()) for i,j in enumerate(range(F.size('sents')))],
             *[c['sents'][i].alias('phraseSents'+"{}".format(j)).cast(IntegerType()) for i,j in enumerate(range(nb_columns))])

但是我不能在结构化流中使用.head(),.collect()或.take()之类的东西来创建表示要动态创建的列数的数字变量。有任何想法吗??

感谢所有人

python apache-spark dynamic pyspark spark-structured-streaming
1个回答
0
投票

您可以尝试使用一个返回每列的最大大小的函数(我使用了first而不是collect),然后将每个数组列拆分为多列并选择它们:

def f(x):
    r = df.select(F.max(F.size(x)).alias("col")).first().col
    return [df[x][i] for i in range(r)]
l = [f(column) for column in df.columns] #change df.columns to specific list you may have

df.select(*[b for a in l for b in a]).show()

+--------+--------+--------+-----------+-----------+-----------+
|sents[0]|sents[1]|sents[2]|metadata[0]|metadata[1]|metadata[2]|
+--------+--------+--------+-----------+-----------+-----------+
|       1|      -1|       0|     0.4991|     0.5378|        0.0|
|      -1|    null|    null|     0.6281|       null|       null|
|      -1|    null|    null|      0.463|       null|       null|
+--------+--------+--------+-----------+-----------+-----------+
© www.soinside.com 2019 - 2024. All rights reserved.