如何通过Spark Streaming按ngram进行过滤?

问题描述 投票:0回答:1

我正在使用Spark Streaming从S3中读取一些CSV文件。文件有3列。一列称为movie_plot,我需要做的是过滤掉与该列的值无法匹配某些查询的记录。查询可以由一个或多个术语组成(具有一个或多个距离词)。例如:

This is a sample of plot that is bad because includes John Smith as actor.
Also it is a drama romantic adventure war movie.

由于某些OR条件应被过滤掉:

  • 包括sample
  • 包括John Smith作为演员
  • 包括词干dram-(可能是dramadramaticdramatisation等)
  • 包括两个单词romanticwar,它们之间只有一个单词距离(中间是adventure

为了流式传输文件(当前只是在控制台上打印出结果),我这样写:

import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.types.StructType
import org.apache.spark.streaming._
import org.datasyslab.geospark.serde.GeoSparkKryoRegistrator

object MoviesStream extends App {

  val spark: SparkSession = SparkSession
    .builder()
    .master("local[*]")
    .config("spark.serializer", classOf[KryoSerializer].getName)
    .config("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName)
    .appName("MoviesStream")
    .getOrCreate()

  val ssc = new StreamingContext(spark.sparkContext, Seconds(30))

  val movieFilesSchema = new StructType()
  .add("movie_id", "string")
  .add("movie_name", "string")
  .add("movie_plot", "string")

  val moviesReadStream = spark.readStream
    .schema(movieFilesSchema)
//    .option("latestFirst", "true")
    .option("maxFilesPerTrigger", "20")
    .csv("s3a://movies-bucket/*")
    .select("*")

    val moviesWriteStream = moviesReadStream
    .writeStream
    .foreach(
      new ForeachWriter[Row] {

        def open(partitionId: Long, version: Long): Boolean = true

        def process(row: Row) = {
          row.getValuesMap(row.schema.fieldNames).// do something here to filter out by plot
        }

        def close(errorOrNull: Throwable): Unit = {
          // Close the connection
        }
      }
    )
    .start()

  moviesWriteStream.awaitTermination()
}

如何在写过滤器中完成def process(row: Row)方法?由于某些原因,正则表达式在我的情况下效果不佳。但是我不知道该怎么做,因为Spark MLlib的NGram看起来没有这个要求。我以为Lucene不确定该方法是否正确,因此会在该方法中进行一些inline过滤。有想法吗?

apache-spark spark-structured-streaming n-gram
1个回答
0
投票

对于给定的句子(字符串数组),您可以创建n-gram,如示例中所示-https://spark.apache.org/docs/latest/ml-features.html#n-gram

基本上,

  1. 读取行,用标准分隔符(空格,逗号,分号)分割

  2. 将字符串数组传递给org.apache.spark.ml.feature.NGram

  3. Ngram.transform将为该句子创建n-gram,并将其作为新列添加到您的输入数据框中。

  4. 您可以对此n-gram列进行过滤。

© www.soinside.com 2019 - 2024. All rights reserved.