Reputation: 7986
I've have a load of case classes which I've used in spark to save data as parquet, e.g.:
case class Person(userId: String,
technographic: Option[Technographic] = None,
geographic: Option[Geographic] = None)
case class Technographic(browsers: Seq[Browser],
devices: Seq[Device],
oss: Seq[Os])
case class Browser(family: String,
major: Option[String] = None,
language: String
...
How can I convert the data on disk back to these case classes?
I need to be able to select multiple columns and explode them so that the for each list (e.g. browsers
) all of the sub lists have the same lengths.
E.g. Given this original data:
Person(userId="1234",
technographic=Some(Technographic(browsers=Seq(
Browser(family=Some("IE"), major=Some(7), language=Some("en")),
Browser(family=None, major=None, language=Some("en-us")),
Browser(family=Some("Firefox), major=None, language=None)
)),
geographic=Some(Geographic(...))
)
I need, e.g. for the browser data to be as follows (as well as being able to select all columns):
family=IE, major=7, language=en
family=None, major=None, language=en-us
family=Firefox, major=None, language=None
which I could get if spark could explode
each list item. Currently it will just do something like (and anyway explode
won't work with multiple columns):
browsers.family = ["IE", "Firefox"]
browsers.major = [7]
browsers.language = ["en", "en-us"]
So how how can I reconstruct a user's record (the entire set of case classes that produced a row of data) from all this nested optional data using spark 1.5.2?
One possible approach is:
val df = sqlContext.read.parquet(inputPath)
df.registerTempTable("person")
val fields = df.select("desc person")
df.select("select * from person").map { x =>
... // somehow zip `fields` with the values so that I can
// access values by column name instead of index
// (which is brittle), but how?
}
Upvotes: 2
Views: 3765
Reputation: 12988
Given
case class Browser(family: String,
major: Option[Int] = None,
language: String)
case class Tech(browsers: Seq[Browser],
devices: Seq[String],
oss: Seq[String])
case class Person(userId: String,
tech: Option[Tech] = None,
geographic: Option[String] = None)
and some convenience types/functions for org.apache.spark.sql.Row
type A[E] = collection.mutable.WrappedArray[E]
implicit class RichRow(val r: Row) {
def getOpt[T](n: String): Option[T] = {
if (isNullAt(n)) {
None
} else {
Some(r.getAs[T](n))
}
}
def getStringOpt(n: String) = getOpt[String](n)
def getString(n: String) = getStringOpt(n).get
def getIntOpt(n: String) = getOpt[Int](n)
def getInt(n: String) = r.getIntOpt(n).get
def getArray[T](n: String) = r.getAs[A[T]](n)
def getRow(n: String) = r.getAs[Row](n)
def getRows(n: String) = r.getAs[A[Row]](n)
def isNullAt(n: String) = r.isNullAt(r.fieldIndex(n))
}
then parsing can be organized in some functions:
def toBrowser(r: Row): Browser = {
Browser(
r.getString("family"),
r.getIntOpt("major"),
r.getString("language"))
}
def toBrowsers(rows: A[Row]): Seq[Browser] = {
rows.map(toBrowser)
}
def toTech(r: Row): Tech = {
Tech(
toBrowsers(r.getRows("browsers")),
r.getArray[String]("devices"),
r.getArray[String]("oss"))
}
def toTechOpt(r: Row): Option[Tech] = {
Option(r).map(toTech)
}
def toPerson(r: Row): Person = {
Person(
r.getString("userId"),
toTechOpt(r.getRow("tech")),
r.getStringOpt("geographic"))
}
so you can write
df.map(toPerson).collect().foreach(println)
I have organized the parse functions into "stand-alone" methods. I'd normally put these either as apply
into the companion object of the case class or as implicit values classes for Row
as well. The reason for functions is that this is easier to paste into the spark-shell
Each parse function handles plain columns and arrays directly, but delegates to another function when it encounters a collection (Seq
and Option
- these represent the next nesting level)
The implict class
should extend AnyVal
, but again this cannot be pasted into the spark-shell
Upvotes: 5
Reputation: 7986
To elaborate on the accepted answer, it doesn't correctly deal with null values. You need to try to cast it to a string to find out whether it's null. However, that will only be successful if the value is null - if the value is non-null it will result in a casting exception.
Confused? Here's teh codez:
implicit class RichRow(val r: Row) extends AnyVal {
def getBoolean(n: String) = r.getAs[Boolean](n)
def getBooleanOpt(n: String) = Try(r.getString(n)) match {
case Success(_) => None
case _ => Option(r.getBoolean(n))
}
def getString(n: String) = r.getAs[String](n)
def getStringOpt(n: String) = Option(r.getString(n))
def getLong(n: String) = r.getAs[Long](n)
def getLongOpt(n: String) = Try(r.getString(n)) match {
case Success(_) => None
case _ => Option(r.getLong(n))
}
def getInt(n: String) = r.getAs[Int](n)
def getIntOpt(n: String) = Try(r.getString(n)) match {
case Success(_) => None
case _ => Option(r.getInt(n))
}
def getFloat(n: String) = r.getAs[Float](n)
def getFloatOpt(n: String) = Try(r.getString(n)) match {
case Success(_) => None
case _ => Option(r.getFloat(n))
}
def getArray[T](n: String) = r.getAs[A[T]](n)
def getRow(n: String) = r.getAs[Row](n)
def getRows(n: String): A[Row] = r.getAs[A[Row]](n)
}
}
Upvotes: 1