我有一个Spark结构化流作业,该作业基于subscribePattern
从多个Kafka主题流式传输数据,对于每个Kafka主题,我都有一个Spark模式。从Kafka流式传输数据时,我想基于主题名称将Spark模式应用于Kafka消息。
考虑一下我有两个主题:cust和customers。
基于subscribePattern
(Java regex字符串)从Kafka流数据:
var df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "cust*")
.option("startingOffsets", "earliest")
.load()
.withColumn("value", $"value".cast("string"))
.filter($"value".isNotNull)
上面的流查询从两个主题流数据。
假设我有两个Spark模式,每个主题一个:
var cust: StructType = new StructType()
.add("name", StringType)
.add("age", IntegerType)
var customers: StructType = new StructType()
.add("id", IntegerType)
.add("first_name", StringType)
.add("last_name", StringType)
.add("email", StringType)
.add("address", StringType)
现在,我想基于主题名称应用Spark Schema,为此,我编写了一个udf,它读取主题名称并以DDL格式返回该模式:
val schema = udf((table: String) => (table) match {
case ("cust") => cust.toDDL
case ("customers") => customers.toDDL
case _ => new StructType().toDDL
})
然后我在from_json方法内使用udf(我知道udf适用于每列):
期望DDL格式或StructType的String模式。val query = df .withColumn("topic", $"topic".cast("string")) .withColumn("data", from_json($"value", schema($"topic"))) .select($"key", $"topic", $"data.*") .writeStream.outputMode("append") .format("console") .start() .awaitTermination()
这给了我以下正确的例外,因为from_json
org.apache.spark.sql.AnalysisException: Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of UDF(topic);
我想知道如何实现这一目标?
任何帮助将不胜感激!
我有一个Spark结构化流式作业,该作业基于subscriptionPattern从多个Kafka主题流式传输数据,对于每个Kafka主题,我都有一个Spark模式。从...
您在做什么是不可能的。您的query
df不能有2个不同的架构。