Reputation: 300
I am trying to convert JSON documents into Parquet, from a source I cannot control. The schema is self-describing and evolving.
Spaces need to be removed from field names in nested nodes to convert/write from spark dataframe to Parquet.
How can Spark (on python) remove spaces from field names in a nested schema?
Following seems to work on a flat schema, but does not seem to work for a nested tree.
exprs = [col(column).alias(column.replace(' ', '_')) for column in jsonDF.columns]
newDF = jsonDF.select(*exprs)
newDF.write \
.format("parquet") \
.mode("overwrite") \
.save("/path/to/parquet_test1")
Here is a representative made up schema. Note the spaces in field names at different depths of the node tree.
root
|-- Browser Info: struct (nullable = true)
| |-- Operating System: struct (nullable = true)
| | |-- Android: double (nullable = true)
| | |-- BlackBerryOS: double (nullable = true)
| | |-- ChromeOS: double (nullable = true)
| | |-- Linux: double (nullable = true)
| | |-- Mac OS X: double (nullable = true)
| | |-- Windows: double (nullable = true)
| | |-- iOS: double (nullable = true)
| |-- Browser Types: struct (nullable = true)
| | |-- Chrome: double (nullable = true)
| | |-- Firefox: double (nullable = true)
| | |-- IE 10: double (nullable = true)
| | |-- IE 8: double (nullable = true)
| | |-- IE 9: double (nullable = true)
| | |-- Opera: double (nullable = true)
| | |-- Safari 5: double (nullable = true)
| | |-- Safari 6: double (nullable = true)
| | |-- Safari 7: double (nullable = true)
Upvotes: 2
Views: 1553
Reputation: 1008
I can provide the code in Scala I hope that helps. This is a bit dirty way to do implement what you are asking, but would do the trick.
import sqlContext.implicits._
import scala.collection.mutable.ListBuffer
val jsonDF = sqlContext.read.json("path")
val oldSchema = jsonDF.columns.toIterator
val newSchema = oldSchema.map(x => x.replaceAll(" ", "")).toIterator
val schema = new ListBuffer[String]()
while (oldSchema.hasNext) {
val oldSchemValue = oldSchema.next()
val newSchemaValue = newSchema.next()
schema += s"${oldSchemValue} as ${newSchemaValue}"
}
val newJsonDF = jsonDF.selectExpr(schema.toList.head, schema.toList.tail: _*)
Upvotes: 1