Spark Streaming:从Kafka读取JSON并添加event_time

问题描述 投票:0回答:1

我正在尝试编写从Kafka读取的有状态Spark结构化流作业。作为要求的一部分,我需要在流中添加“ event_time”作为附加列。我正在尝试这样的事情:

val schema = spark.read.json("sample-data/test.json").schema
val myStream = sparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "myTopic")
      .load()
val df = myStream.select(from_json($"value".cast("string"), schema).alias("value"))
val withEventTime = df.selectExpr("*", "cast (value.arrivalTime as timestamp) as event_time")

但是我一直收到消息:

给定输入列,无法解析'arrivalTime':[值]

我如何引用JSON中的所有元素?

json scala apache-spark apache-kafka spark-structured-streaming
1个回答
0
投票

我相信我能够使用以下方法解决此问题:

val withEventTime = df.withColumn("event_time",to_timestamp(col("value. arrivalTime")))

不确定为什么这样行得通,而不是另一个。

© www.soinside.com 2019 - 2024. All rights reserved.