Reputation: 141
I have an RDD of type Row i.e, RDD[Row] and avro schema object .I need to create a dataframe with this info.
I need toconvert avro schema object into StructType for creating DataFrame.
Can you please help .
Upvotes: 12
Views: 16909
Reputation: 2682
com.databricks.spark.avro has a class to help you with this
StructType requiredType = (StructType) SchemaConverters.toSqlType(AvroClass.getClassSchema()).dataType();
Upvotes: 6
Reputation: 417
The answer from Wisnia works, but FYI another solution my coworkers and I came up with was the following:
avro_schema = "..."
java_schema_type = spark._jvm.org.apache.spark.sql.avro.SchemaConverters.toSqlType(
spark._jvm.org.apache.avro.Schema.Parser().parse(avro_schema)
)
java_struct_schema = java_schema_type.dataType()
struct_json_schema = java_struct_schema.json()
json_schema_obj = json.loads(struct_json_schema)
schema = StructType.fromJson(json_schema_obj)
Upvotes: 6
Reputation: 352
Any example for doing same in pyspark? Below code works for me but there should be some other easier way to do this
# pyspark --packages org.apache.spark:spark-avro_2.11:2.4.4
import requests
import os
import avro.schema
from pyspark.sql.types import StructType
schema_registry_url = 'https://schema-registry.net/subjects/subject_name/versions/latest/schema'
schema_requests = requests.get(url=schema_registry_url)
spark_type = sc._jvm.org.apache.spark.sql.avro.SchemaConverters.toSqlType(sc._jvm.org.apache.avro.Schema.Parser().parse(schema_requests.text))
Upvotes: 3
Reputation: 61
In pyspark 2.4.7 my solusion is to create an empty dataframe with avroschema and then take the the StructType object from this empty dataframe.
with open('/path/to/some.avsc','r') as avro_file:
avro_scheme = avro_file.read()
df = spark\
.read\
.format("avro")\
.option("avroSchema", avro_scheme)\
.load()
struct_type = df.schema
Upvotes: 6
Reputation: 1832
Updated as of 2020-05-31
Use below if you're on scala 2.12
with a newer spark version.
sbt:
scalaVersion := "2.12.11"
val sparkVersion = "2.4.5"
libraryDependencies += "org.apache.spark" %% "spark-avro" % sparkVersion
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.types.StructType
val schemaType = SchemaConverters
.toSqlType(avroSchema)
.dataType
.asInstanceOf[StructType]
Upvotes: 4
Reputation: 479
Databrics gives support to avro related utilities in spark-avro package, use below dependency in sbt "com.databricks" % "spark-avro_2.11" % "3.2.0"
Code
*
val sqlSchema= SchemaConverters.toSqlType(avroSchema)
*
Before '3.2.0' version, 'toSqlType' is private method so if you are using older version than 3.2 then copy complete method in your own util class else upgrade to latest version.
Upvotes: 1