无法使用 pyspark readstream 从 kafka 主题读取记录数组

问题描述 投票:0回答:1

我正在使用来自 kafka 主题的 pyspark readstream 以及一系列记录,例如 [ {}, {}, {} ]。 我能够使用 from_avro( F.col('value'), avro_schema ) 解析单个记录。 但是,该主题的实际数据是一组记录,我尝试在我的 avro 架构周围添加 [] 但不起作用。

单条记录的avro模式是

{
    "type": "record",
    "name": "data",
    "fields": [
        {
            "name": "x",
            "type": [
                "double",
                "null"
            ]
        },
        {
            "name": "y",
            "type": [
                "double",
                "null"
            ]
        }
    ]
}

但我需要的是一个 avro 模式,它可以解析这个记录的数组 [{},{}]

我知道我可以使用 pandas UDF,但我只想知道是否有本地方法(使用 spark API)来做到这一点。

apache-spark avro spark-structured-streaming
1个回答
0
投票

如果你的 Kafka 负载是一个数组,你的 Avro schema 需要像这样开始

{
"type": "array",
"items": {
    "type": "record",
    "name": "data",
    "fields": [
      ... 

您不能简单地在记录类型周围添加

[]

然后,Spark 应该为反序列化的值列返回一个 Struct 类型的数组

还值得一提 -

from_avro
,默认情况下,如果您的 Kafka 数据是使用 Confluent Schema Registry 生成的,则不起作用...

© www.soinside.com 2019 - 2024. All rights reserved.