循环遍历大型数据帧并执行sql

问题描述 投票:0回答:1

我有一个大文件(~5GB),我已加载到数据帧中。现在我必须从每一行获取一个值(fid)并获取同一数据帧中的相应行。

var references = df.sqlContext.sql("Select authors,references,id from publications")
references.collect().foreach(ref => ref.getSeq[String](1).foreach(id => {
      val authors = ref.getSeq[String](0)
      val a = df.sqlContext.sql(s"SELECT authors from publications  WHERE id='$id'")

      }
))

我已经尝试了上面的代码,因为collect动作我得到了一个内存不足的异常。我尝试增加内存,但仍然没有成功。

scala apache-spark rdd
1个回答
1
投票

正如你的要求所说,如果authors列的值与id数组列中的任何id匹配,则选择references列。

您可以通过定义udf函数来实现您的要求

import org.apache.spark.sql.functions._
def containsUdf = udf((ref: collection.mutable.WrappedArray[String], id:String) => ref.contains(id))

df.select("authors").where(containsUdf(col("references"), col("id")))

udf函数检查references列数组是否包含id列的值。如果条件匹配,它将返回true,否则将返回false。

所有其他内置函数都按列方式执行,但udf函数逐行执行其操作。

udf函数中调用where函数,该函数在udf函数返回true时过滤行。

如果你使用filter而不是where,那就更清楚了

df.filter(containsUdf(col("references"), col("id"))).select("authors")

更新

以上解决方案用于检查每行内的条件。但是,如果您的条件是检查不同的行,那么您应该执行以下操作。我已经评论说明已完成的步骤

val tempId = df.select("id")  //creating temp table of id for inner join later on

import org.apache.spark.sql.functions._
df.select(col("authors"), explode(col("references")).as("id"))  // selecting authors and exploding references column so that each element of array in reference column is exploded to each row
  .join(tempId, Seq("id"))   // inner join the exploded dataframe with the temp table created above, this will filter out not matching id rows
  .select("authors")         // selecting only  the authors column
  .distinct()                // optional step for removing duplicate rows if any
© www.soinside.com 2019 - 2024. All rights reserved.