如何在spark中将Avro Schema对象转换为StructType

问题描述 投票:0回答:6

我有一个 Row 类型的 RDD,即 RDD[Row] 和 avro 架构对象。我需要使用此信息创建一个数据框。

我需要将 avro 模式对象转换为 StructType 以创建 DataFrame。

你能帮忙吗?

apache-spark schema rdd avro
6个回答
5
投票

在 pyspark 2.4.7 中,我的解决方案是使用 avroschema 创建一个空数据帧,然后从此空数据帧中获取 StructType 对象。

with open('/path/to/some.avsc','r') as avro_file:
    avro_scheme = avro_file.read()

df = spark\
    .read\
    .format("avro")\
    .option("avroSchema", avro_scheme)\
    .load()

struct_type = df.schema


5
投票

Wisnia 的答案有效,但仅供参考,我和我的同事想出的另一个解决方案如下:

avro_schema = "..."

java_schema_type = spark._jvm.org.apache.spark.sql.avro.SchemaConverters.toSqlType(
    spark._jvm.org.apache.avro.Schema.Parser().parse(avro_schema)
)

java_struct_schema = java_schema_type.dataType()
struct_json_schema = java_struct_schema.json()
json_schema_obj = json.loads(struct_json_schema)
schema = StructType.fromJson(json_schema_obj)

5
投票

com.databricks.spark.avro 有一个课程可以帮助您解决此问题

 StructType requiredType = (StructType) SchemaConverters.toSqlType(AvroClass.getClassSchema()).dataType();

4
投票

更新截至2020-05-31

如果您使用的是较新的 Spark 版本的 scala

2.12
,请使用下面的内容。

sbt:

scalaVersion := "2.12.11"
val sparkVersion = "2.4.5"
libraryDependencies += "org.apache.spark" %% "spark-avro" % sparkVersion
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.types.StructType

val schemaType = SchemaConverters
  .toSqlType(avroSchema)
  .dataType
  .asInstanceOf[StructType]

2
投票

有在 pyspark 中做同样事情的例子吗?下面的代码对我有用,但应该有其他更简单的方法来做到这一点

# pyspark --packages org.apache.spark:spark-avro_2.11:2.4.4

import requests
import os
import avro.schema

from pyspark.sql.types import StructType

schema_registry_url = 'https://schema-registry.net/subjects/subject_name/versions/latest/schema'
schema_requests = requests.get(url=schema_registry_url)

spark_type = sc._jvm.org.apache.spark.sql.avro.SchemaConverters.toSqlType(sc._jvm.org.apache.avro.Schema.Parser().parse(schema_requests.text))

1
投票

Databrics在spark-avro包中支持avro相关实用程序,在sbt中使用以下依赖项 “com.databricks”%“spark-avro_2.11”%“3.2.0”

代码

*

val sqlSchema=SchemaConverters.toSqlType(avroSchema)

*

在“3.2.0”版本之前,“toSqlType”是私有方法,因此如果您使用的版本早于 3.2,请在您自己的 util 类中复制完整方法,否则升级到最新版本。

© www.soinside.com 2019 - 2024. All rights reserved.