user3865770
user3865770

Reputation: 425

Spark avro to parquet

I have a stream of avro formatted data (json encoded) which needs to be stored as parquet files. I could only do this,

val df = sqc.read.json(jsonRDD).toDF()

and write the df as parquet.

Here the schema is inferred form the json. But i already have the avsc file and I don't want spark to infer the schema from the json.

And in the above mentioned way the parquet files store the schema info as StructType and not as avro.record.type. Is there a way to store the avro schema information as well.

SPARK - 1.4.1

Upvotes: 4

Views: 5114

Answers (2)

Avi Chalbani
Avi Chalbani

Reputation: 880

you can programmatically Specifying the Schema

// The schema is encoded in a string
val schemaString = "name age"

// Import Row.
import org.apache.spark.sql.Row;

// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};

// Generate the schema based on the string of schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

please see: http://spark.apache.org/docs/latest/sql-programming-guide.html

spark-avro then uses the schema types to specify avro types as follows

  • Spark SQL type -> Avro type
  • ByteType -> int
  • ShortType -> int
  • DecimalType -> string
  • BinaryType -> bytes
  • TimestampType -> long
  • StructType -> record

You can write Avro records as follows:

import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

val df = Seq((2012, 8, "Batman", 9.8),
        (2012, 8, "Hero", 8.7),
        (2012, 7, "Robot", 5.5),
        (2011, 7, "Git", 2.0))
        .toDF("year", "month", "title", "rating")

df.write.partitionBy("year", "month").avro("/tmp/output")

Upvotes: 0

user3865770
user3865770

Reputation: 425

Ended up using the answer for this question avro-schema-to-spark-structtype

def getSparkSchemaForAvro(sqc: SQLContext, avroSchema: Schema): StructType = {
    val dummyFIle = File.createTempFile("avro_dummy", "avro")
    val datumWriter = new GenericDatumWriter[wuser]()
    datumWriter.setSchema(avroSchema)
    val writer = new DataFileWriter(datumWriter).create(avroSchema, dummyFIle)
    writer.flush()
    writer.close()
    val df = sqc.read.format("com.databricks.spark.avro").load(dummyFIle.getAbsolutePath)
    df.schema
}

Upvotes: 2

Related Questions