使用 Spark Mongo 连接器从 Mongo 读取数据时如何删除或跳过不匹配的数据类型

问题描述 投票:0回答:1

我正在尝试使用 Spark Mongo 连接器从 Mongo 读取数据,尝试加载 100M+ 行。

有谁知道如果使用我的预定义架构存在数据类型不匹配,我如何忽略行?有一些日期字段是时间戳,但有人插入了字符串类型。其他领域也有类似的问题。

我想知道如何处理数据类型不匹配,Spark Mongo Connector 中不提供

DROPMALFORMED
功能。

任何建议或指示都会有帮助。

spark-mongo-connector 10.1.1, delta 2.3, Spark EMR 3.3

例如Mongo中有一个Date类型的field1,其中一条记录被插入为日期的String类型或错误的值。 Field2 为整数,但又以字符串形式插入 1 条或多条记录,依此类推。

mongodb apache-spark pyspark apache-spark-sql delta-lake
1个回答
0
投票

您可以尝试如下所示:

import org.apache.spark.sql.functions._
import spark.implicits._

val df = spark.read.format("mongo").load()

val correctedDf = df
  .withColumn("field1", when($"field1".cast("date").isNotNull, $"field1".cast("date"))
                           .otherwise(lit(null))) // Replace with appropriate default for your use-cas
    ```
© www.soinside.com 2019 - 2024. All rights reserved.