我有一个大文件(~5GB),我已加载到数据帧中。现在我必须从每一行获取一个值(fid)并获取同一数据帧中的相应行。
var references = df.sqlContext.sql("Select authors,references,id from publications")
references.collect().foreach(ref => ref.getSeq[String](1).foreach(id => {
val authors = ref.getSeq[String](0)
val a = df.sqlContext.sql(s"SELECT authors from publications WHERE id='$id'")
}
))
我已经尝试了上面的代码,因为collect动作我得到了一个内存不足的异常。我尝试增加内存,但仍然没有成功。
正如你的要求所说,如果authors
列的值与id
数组列中的任何id匹配,则选择references
列。
您可以通过定义udf
函数来实现您的要求
import org.apache.spark.sql.functions._
def containsUdf = udf((ref: collection.mutable.WrappedArray[String], id:String) => ref.contains(id))
df.select("authors").where(containsUdf(col("references"), col("id")))
udf
函数检查references
列数组是否包含id
列的值。如果条件匹配,它将返回true,否则将返回false。
所有其他内置函数都按列方式执行,但udf
函数逐行执行其操作。
在udf
函数中调用where
函数,该函数在udf
函数返回true时过滤行。
如果你使用filter
而不是where
,那就更清楚了
df.filter(containsUdf(col("references"), col("id"))).select("authors")
更新
以上解决方案用于检查每行内的条件。但是,如果您的条件是检查不同的行,那么您应该执行以下操作。我已经评论说明已完成的步骤
val tempId = df.select("id") //creating temp table of id for inner join later on
import org.apache.spark.sql.functions._
df.select(col("authors"), explode(col("references")).as("id")) // selecting authors and exploding references column so that each element of array in reference column is exploded to each row
.join(tempId, Seq("id")) // inner join the exploded dataframe with the temp table created above, this will filter out not matching id rows
.select("authors") // selecting only the authors column
.distinct() // optional step for removing duplicate rows if any