如何在pyspark中读取自定义格式的日期

问题描述 投票:2回答:2

我想使用spark.read()从.csv文件中提取数据,同时强制执行模式。但是,我无法将我的日期识别为时间戳。

首先,我创建一个虚拟文件来测试

%scala
Seq("1|1/15/2019 2:24:00 AM","2|test","3|").toDF().write.text("/tmp/input/csvDateReadTest")

然后我尝试读取它,并提供dateFormat字符串,但它无法识别我的日期,并将记录发送到badRecordsPath

df = spark.read.format('csv')
               .schema("id int, dt timestamp")
               .option("delimiter","|")
               .option("badRecordsPath","/tmp/badRecordsPath")
               .option("dateFormat","M/dd/yyyy hh:mm:ss aaa")
               .load("/tmp/input/csvDateReadTest")

结果,当我期望看到2时,我只得到df(ID 3)中的1条记录(ID 1和3)

df.show()

+---+----+
| id|  dt|
+---+----+
|  3|null|
+---+----+


csv apache-spark date-formatting azure-databricks
2个回答
0
投票

您必须将dateFormat更改为timestampFormat,因为您输入的是时间戳而不是日期。此外,timestamp格式的值应为mm/dd/yyyy h:mm:ss a

样本数据:

Seq(
"1|1/15/2019 2:24:00 AM",
"2|test",
"3|5/30/1981 3:11:00 PM"
).toDF().write.text("/tmp/input/csvDateReadTest")

随着时间戳的变化:

val df = spark.read.format("csv")
               .schema("id int, dt timestamp")
               .option("delimiter","|")
               .option("badRecordsPath","/tmp/badRecordsPath")
               .option("timestampFormat","mm/dd/yyyy h:mm:ss a")
               .load("/tmp/input/csvDateReadTest")

并输出:

+----+-------------------+
|  id|                 dt|
+----+-------------------+
|   1|2019-01-15 02:24:00|
|   3|1981-01-30 15:11:00|
|null|               null|
+----+-------------------+

请注意,ID为2的记录无法符合架构定义,因此将包含null。如果你想保留无效记录,你需要将timestamp列更改为字符串,在这种情况下输出将是:

+---+--------------------+
| id|                  dt|
+---+--------------------+
|  1|1/15/2019 2:24:00 AM|
|  3|5/30/1981 3:11:00 PM|
|  2|                test|
+---+--------------------+

更新:

为了将字符串dt更改为时间戳类型,您可以尝试使用df.withColumn("dt", $"dt".cast("timestamp")),尽管这会失败并将所有值替换为null。

您可以使用下一个代码实现此目的:

import org.apache.spark.sql.Row
import java.text.SimpleDateFormat
import java.util.{Date, Locale} 
import java.sql.Timestamp
import scala.util.{Try, Success, Failure}

val formatter = new SimpleDateFormat("mm/dd/yyyy h:mm:ss a", Locale.US)
df.map{ case Row(id:Int, dt:String) =>
    val tryParse = Try[Date](formatter.parse(dt))

    val p_timestamp = tryParse match {
        case Success(parsed) => new Timestamp(parsed.getTime())
        case Failure(_) => null
    }

    (id, p_timestamp)
}.toDF("id", "dt").show

输出:

+---+-------------------+
| id|                 dt|
+---+-------------------+
|  1|2019-01-15 02:24:00|
|  3|1981-01-30 15:11:00|
|  2|               null|
+---+-------------------+

0
投票

嗨,这里是示例代码

df.withColumn("times", 
              from_unixtime(unix_timestamp(col("df"), "M/dd/yyyy hh:mm:ss a"),
              "yyyy-MM-dd HH:mm:ss.SSSSSS"))
  .show(false)
© www.soinside.com 2019 - 2024. All rights reserved.