如何在Spark结构化流中基于Kafka主题名称将Spark模式应用于查询?

问题描述 投票:0回答:1

我有一个Spark结构化流作业,该作业基于subscribePattern从多个Kafka主题流式传输数据,对于每个Kafka主题,我都有一个Spark模式。从Kafka流式传输数据时,我想基于主题名称将Spark模式应用于Kafka消息。

考虑一下我有两个主题:custcustomers

基于subscribePattern(Java regex字符串)从Kafka流数据:

var df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "cust*")
  .option("startingOffsets", "earliest") 
  .load()
  .withColumn("value", $"value".cast("string"))
  .filter($"value".isNotNull)

上面的流查询从两个主题流数据。

假设我有两个Spark模式,每个主题一个:

var cust: StructType = new StructType()
    .add("name", StringType)
    .add("age", IntegerType)

var customers: StructType = new StructType()
    .add("id", IntegerType)
    .add("first_name", StringType)
    .add("last_name", StringType)
    .add("email", StringType)
    .add("address", StringType)

现在,我想基于主题名称应用Spark Schema,为此,我编写了一个udf,它读取主题名称并以DDL格式返回该模式:

val schema = udf((table: String) => (table) match {
    case ("cust")      => cust.toDDL
    case ("customers") => customers.toDDL
    case _             => new StructType().toDDL
  })

然后我在from_json方法内使用udf(我知道udf适用于每列):

val query = df
    .withColumn("topic", $"topic".cast("string"))
    .withColumn("data", from_json($"value", schema($"topic")))
    .select($"key", $"topic", $"data.*")
    .writeStream.outputMode("append")
    .format("console")
    .start()
    .awaitTermination()

这给了我以下正确的例外,因为from_json

期望DDL格式或StructType的String模式。
org.apache.spark.sql.AnalysisException: Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of UDF(topic);

我想知道如何实现这一目标?

任何帮助将不胜感激!

我有一个Spark结构化流式作业,该作业基于subscriptionPattern从多个Kafka主题流式传输数据,对于每个Kafka主题,我都有一个Spark模式。从...

scala apache-spark apache-kafka spark-structured-streaming
1个回答
0
投票

您在做什么是不可能的。您的query df不能有2个不同的架构。

© www.soinside.com 2019 - 2024. All rights reserved.