[我正在尝试从kafka消息中读取标头和有效负载,我能够读取有效负载并将其映射到架构,但是在读取器标头值中存在问题……到目前为止,我有这个。
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", Utils.getKafkaBroker)
//.option("subscribe", Utils.getKafkaTopic)
.option("assign", s"""{"${Utils.getKafkaTopic}":[0]}""")
.option("startingOffsets", "latest")
.option("includeHeaders", "true")
.option("failOnDataLoss","false")
.load()
val headerSchema=new StructType()
.add("facilityCountryCode", StringType)
.add("facilityNum", StringType)
.add("WMTCorrelationId", StringType)
.add("WMTIdempotencyKey",StringType)
.add("eventTs",StringType)
val valueSchema = new StructType()
.add("eventObject", new StructType()
.add("baseDivisionCode", StringType)
.add("countryCode", StringType)
.add("dcNumber", LongType)
.add("financialReportingGroup", StringType)
.add("itemList", new ArrayType(
new StructType(
Array(
StructField("itemNumber", LongType),
StructField("itemUPC", StringType),
StructField("unitOfMeasurement", StringType),
StructField("availabletosellQty", LongType),
StructField("turnAvailableQty", LongType),
StructField("distroAvailableQty", LongType),
StructField("ossIndicator", StringType),
StructField("weightFormatType", StringType))), containsNull = true)))
val explodeddHeader= cast.select(from_json($"key",headerSchema).as("explodedHeader"),from_json($"value", valueSchema).as("exploded"),$"kafka_timestamp")
val fully_flattened=explodeddHeader......
fully_flattened.writeStream
.format("console")
.option("truncate","false")
.trigger(Trigger.ProcessingTime(5, TimeUnit.SECONDS))
.start()
.awaitTermination()
我正在正确映射有效负载值,并将标头值获取为空
facilityCountryCode|facilityNum|WMTCorrelationId|countryCode|dcNumber
|null |null |null |WM |US |6080
根据Spark Structured Streaming + Kafka Integration Guide,流中的每一行都具有以下模式:
Column Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampTypeint
没有列出标题,因此您将无法访问它。
顺便说一句,在您的代码中,您当前正在选择key
并在其上应用标头模式。由于它可能不匹配,您将获得空值。