我正在尝试编写从Kafka读取的有状态Spark结构化流作业。作为要求的一部分,我需要在流中添加“ event_time”作为附加列。我正在尝试这样的事情:
val schema = spark.read.json("sample-data/test.json").schema
val myStream = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "myTopic")
.load()
val df = myStream.select(from_json($"value".cast("string"), schema).alias("value"))
val withEventTime = df.selectExpr("*", "cast (value.arrivalTime as timestamp) as event_time")
但是我一直收到消息:
给定输入列,无法解析'arrivalTime':[值]
我如何引用JSON中的所有元素?
我相信我能够使用以下方法解决此问题:
val withEventTime = df.withColumn("event_time",to_timestamp(col("value. arrivalTime")))
不确定为什么这样行得通,而不是另一个。