Reputation: 103
using spark 1.2.0
I want to save data from kafka stream to parquet. apply a schema to a JSON dataset when creating a table using jsonRDD. as described here
The data is from Kafka and is coming through as a nested json.
Here is a basic example reading from a textfile for how Ive specific the schema for a non nested json.
//contents of json
hdfs@2db12:~$ hadoop fs -cat User/names.json
{"name":"Michael", "age":10}
{"name":"Andy", "age":30}
//create RDD from json
scala> val names= sc.textFile("hdfs://")
scala> names.collect().foreach(println)
{"name":"Michael", "age":10}
{"name":"Andy", "age":30}
// specify schema
val schemaString = "name age gender"
val schema =
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val peopleSchemaRDD = sqlContext.jsonRDD(names, schema)
scala> peopleSchemaRDD.printSchema()
|-- name: string (nullable = true)
|-- age: string (nullable = true)
|-- gender: string (nullable = true)
scala> peopleSchemaRDD.registerTempTable("people")
scala> sqlContext.sql("SELECT name,age,gender FROM people").collect().foreach(println)
Is it possible to specify the schema for a nested json? for e.g .a json like this {"filename":"details","attributes":{"name":"Michael", "age":10}}
Many Thanks
Upvotes: 1
Views: 10494
Reputation: 994
A java version .. the below link helped me
create nested dataframe programmatically with Spark
public static void main(String[] args) throws AnalysisException {
String master = "local[*]";
List<StructField> employeeFields = new ArrayList<>();
employeeFields.add(DataTypes.createStructField("firstName", DataTypes.StringType, true));
employeeFields.add(DataTypes.createStructField("lastName", DataTypes.StringType, true));
employeeFields.add(DataTypes.createStructField("email", DataTypes.StringType, true));
List<StructField> addressFields = new ArrayList<>();
addressFields.add(DataTypes.createStructField("city", DataTypes.StringType, true));
addressFields.add(DataTypes.createStructField("state", DataTypes.StringType, true));
addressFields.add(DataTypes.createStructField("zip", DataTypes.StringType, true));
ArrayType addressStruct = DataTypes.createArrayType( DataTypes.createStructType(addressFields));
employeeFields.add(DataTypes.createStructField("addresses", addressStruct, true));
StructType employeeSchema = DataTypes.createStructType(employeeFields);
SparkSession sparkSession = SparkSession
SparkContext context = sparkSession.sparkContext();
SQLContext sqlCtx = sparkSession.sqlContext();
Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
Dataset<Employee> rowDataset =
.option("inferSchema", "false")
sqlCtx.sql("select * from employeeView").show();
Upvotes: 3
Reputation: 967
you can use sqlContext.jsonFile()
if you have at least one json with gender field.
Or detailed define schema
val schema = StructType(
StructField("filename", StringType, true) ::
StructType(schemaString.split(" ").map(fieldName =>
StructField(fieldName, StringType, true)
) :: Nil
Upvotes: 3