我无法将FP-Growth模型融入火花中

问题描述 投票:0回答:1

拜托,你能帮帮我吗?我有一个80个CSV文件数据集和一个主服务器和4个从服务器的集群。我想在数据帧中读取CSV文件并在四个从属设备上并行化。之后,我想用group by过滤数据帧。在我的spark查询中,结果包含按“(code_ccam”,“档案”)分组的列“code_ccam”和“档案”。我想使用FP-Growth算法来检测“code_ccam”的序列,这些序列由“folder”重复。但是当我使用FPGrowth.fit()命令时,我有以下错误:

"error: type mismatch;
found : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
required: org.apache.spark.sql.Dataset[_]"

这是我的spark命令:

val df = spark.read.option("header", "true").csv("file:///home/ia/Projet-Spark-ace/Donnees/Fichiers CSV/*.csv")
import org.apache.spark.sql.functions.{concat, lit}
val df2 = df.withColumn("dossier", concat(col("num_immatriculation"), lit(""), col("date_acte"), lit(""), col("rang_naissance"), lit(""), col("date_naissance")))
val df3 = df2.drop("num_immatriculation").drop("date_acte").drop("rang_naissance").drop("date_naissance")
val df4 = df3.select("dossier","code_ccam").groupBy("dossier","code_ccam").count()
val transactions = df4.agg(collect_list("code_ccam").alias("codes_ccam")).rdd.map(x => x)
import org.apache.spark.ml.fpm.FPGrowth
val fpgrowth = new FPGrowth().setItemsCol("code_ccam").setMinSupport(0.5).setMinConfidence(0.6)
val model = fpgrowth.fit(transactions)
scala apache-spark apache-spark-mllib fpgrowth
1个回答
0
投票

非常感谢你。有效。我用collect set替换了collect_list。

© www.soinside.com 2019 - 2024. All rights reserved.