MongoDB Spark连接器中的withPipeline函数在哪里

问题描述 投票:0回答:1

我正在尝试将MongoDB中的一些数据加载到Spark中。我已经定义了一个ReadConfig来指定数据库和集合。我还想应用一个过滤器,以避免丢失所有的集合。我跟随https://docs.mongodb.com/spark-connector/master/scala/aggregation/的例子如下:

val rc = ReadConfig(Map(“database” - >“myDB”,“collection” - >“myCol”),Some(ReadConfig(spark)))

val rdd = MongoSpark.load(spark,rc)

但是rdd没有任何名为withPipeline的函数(似乎它生成了一个regualr DataFrame而不是MongoRDD)我是否错过了导入的东西?我已经进口了

import com.mongodb.spark._

import spark.implicits._

mongodb scala apache-spark pipeline connector
1个回答
0
投票

我猜你在使用Spark.sparkContext时使用spark 2.0使用MongoSpark.load

val collectionDf = MongoSpark.load(spark.sparkContext, readConfig)
val aggregatedRdd = collectionDf.withPipeline(Seq(Document.parse("{ $match: { _id: 'value' } }")))
© www.soinside.com 2019 - 2024. All rights reserved.