问题
我的数据结构是这样的:
train_info:(over 30000 rows)
----------
odt:string (unique)
holiday_type:string
od_label:string
array:array<double> (with variable length depend on different odt and holiday_type )
useful_index:array<int> (length same as vectors)
...(other not important cols)
label_data:(over 40000 rows)
----------
holiday_type:string
od_label: string
l_origin_array:array<double> (with variable length)
...(other not important cols)
my expected result is like this(length same with train_info):
--------------
odt:string
holiday_label:string
od_label:string
prediction:int
我的解决方案是这样的:
if __name__=='__main __'
loop_item = train_info.collect()
result = knn_for_loop(spark, loop_item,train_info.schema,label_data)
----- do something -------
def knn_for_loop(spark, predict_list, schema, label_data):
result = list()
for i in predict_list:
# turn this Row col to Data Frame and join on label data
# across to this row data pick label data array data
predict_df = spark.sparkContext.parallelize([i]).toDF(schema) \
.join(label_data, on=['holiday_type', "od_label"], how='left') \
.withColumn("l_array",
UDFuncs.value_from_array_by_index(f.col('l_origin_array'), f.col("useful_index"))) \
.toPandas()
# pandas execute
train_x = predict_df.l_array.values
train_y = predict_df.label.values
test_x = predict_df.array.values[0]
test_y = KNN(train_x, train_y, test_x)
result.append((i['odt'], i['holiday_type'], i['od_label'], test_y))
return result
它是可行的,但是真的很慢,我估计每行需要18s。
R语言中,我可以使用do函数轻松地做到这一点:
train_info%>%group_by(odt)%>%do(。,knn_loop,label_data)
我的尝试
我尝试在使用前将它们加入,并在计算时查询它们,但是数据太大而无法运行(这两个df在加入后有4亿行,并且在蜂巢上占用180 GB磁盘空间,并且查询速度非常慢) 。我尝试使用pandas_udf,但它只允许使用一个pd.data.frame参数,并且运行缓慢)。
我尝试使用UDF,但是UDF无法接收数据帧obj。
我尝试使用spark-knn软件包,但运行时出错,可能是我的离线状态安装错误。
感谢您的帮助。