如何在Spark结构化流中读取Kafka标头值

问题描述 投票:0回答:1

[我正在尝试从kafka消息中读取标头和有效负载,我能够读取有效负载并将其映射到架构,但是在读取器标头值中存在问题……到目前为止,我有这个。

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", Utils.getKafkaBroker)
  //.option("subscribe", Utils.getKafkaTopic)
  .option("assign", s"""{"${Utils.getKafkaTopic}":[0]}""")
  .option("startingOffsets", "latest")
  .option("includeHeaders", "true")
  .option("failOnDataLoss","false")
  .load()

val headerSchema=new StructType()
  .add("facilityCountryCode", StringType)
  .add("facilityNum", StringType)
  .add("WMTCorrelationId", StringType)
  .add("WMTIdempotencyKey",StringType)
  .add("eventTs",StringType)

val valueSchema = new StructType()
  .add("eventObject", new StructType()
    .add("baseDivisionCode", StringType)
    .add("countryCode", StringType)
    .add("dcNumber", LongType)
    .add("financialReportingGroup", StringType)
    .add("itemList", new ArrayType(
      new StructType(
        Array(
          StructField("itemNumber", LongType),
          StructField("itemUPC", StringType),
          StructField("unitOfMeasurement", StringType),
          StructField("availabletosellQty", LongType),
          StructField("turnAvailableQty", LongType),
          StructField("distroAvailableQty", LongType),
          StructField("ossIndicator", StringType),
          StructField("weightFormatType", StringType))), containsNull = true)))



val explodeddHeader= cast.select(from_json($"key",headerSchema).as("explodedHeader"),from_json($"value", valueSchema).as("exploded"),$"kafka_timestamp")

val fully_flattened=explodeddHeader......
fully_flattened.writeStream
  .format("console")
  .option("truncate","false")
  .trigger(Trigger.ProcessingTime(5, TimeUnit.SECONDS))
  .start()
  .awaitTermination()

我正在正确映射有效负载值,并将标头值获取为空

facilityCountryCode|facilityNum|WMTCorrelationId|countryCode|dcNumber
|null               |null      |null   |WM              |US         |6080    
scala apache-spark apache-kafka streaming
1个回答
0
投票

根据Spark Structured Streaming + Kafka Integration Guide,流中的每一行都具有以下模式:

Column Type 
key binary 
value binary 
topic string 
partition int 
offset long 
timestamp long 
timestampTypeint

没有列出标题,因此您将无法访问它。

顺便说一句,在您的代码中,您当前正在选择key并在其上应用标头模式。由于它可能不匹配,您将获得空值。

© www.soinside.com 2019 - 2024. All rights reserved.