Reputation: 3021
I have a large amount of log data that is semi-structured as CSV data. However, each individual row's columns are dependent on what type of row it is, indicated by a particular column.
Example data:
8/01/2018, person, 1, Bob, Loblaw 32
8/01/2018, person, 2, Roger, McRoger, 55
8/03/2018, dog, Bella, 9, 1
8/05/2018, person, 3, Charlie, McCharles, 23
8/07/2018, dog, Scout, 5, 3
This particular example shows a semi-structured file with two schemas interspersed, roughly equating to these case classes:
case class Person(id: Int, firstName: String, lastName: String, age: Int)
case class Dog(name: String, age: Int, ownerId: Int)
I'm trying to figure out the best way to parse these interspersed data via Spark efficiently so that I can query the dataset, potentially joining between the various row types.
I can load CSV data into a structured schema when all rows are the same, but the heterogenous nature of the rows in these files is stumping me. I've considered that I may be able to use Spark to read the data as text to start with and then do some kind of groupBy operation on the "type" column, at which point I could parse each group individually, but I've not been able to formulate code to do so as the semantics of DataFrames appear to be quite different than a standard Scala collection, eg: groupBy in Spark is not equivalent to a Scala collection groupBy, so far as I can tell.
I realize I could process these files with some sort of ETL prior to using Spark to normalize the data, but it just seems like it should be possible to skip that step and let Spark query the data as-is. Am I just fundamentally on the wrong path?
Upvotes: 1
Views: 317
Reputation: 7279
At large scales, data is often denormalized (nested structures, type mismatches, extra/missing fields, etc). Trying to force-normalize semi-structured data into DataFrames can be cumbersome especially with billions of records.
I suggest looking into the open-source Rumble engine, which is an extra layer on top of Spark. It was specifically designed for heterogeneous data scenarios like this one (Disclaimer: I am part of the team).
For example, it is possible to go through the document, and convert it to a mixed sequence of JSON objects (which do not have to respect a schema) that can be queried at will. The language is JSONiq:
let $my-heterogeneous-list :=
(: first we convert each CSV line to a JSON object, switching on person/dog :)
for $i in text-file("/path/to/heterogeneous-csv.txt")
let $j := tokenize($i, ", ")
let $person := $j[2] eq "person"
return if($person)
then {
"kind" : "person",
"date" : $j[1],
"id" : $j[3],
"first" : $j[4],
"last" : $j[5],
"age" : $j[6]
} else {
"kind" : "dog",
"date" : $j[1],
"name" : $j[3],
"age" : $j[4],
"owner" : $j[5]
}
(: now we can query and re-arrange :)
return
for $dog in $my-heterogeneous-list[$$.kind eq "dog"]
return {|
project($dog, ("date", "name")),
{ "owner" : $my-heterogeneous-list[$$.id eq $dog.owner] }
|}
Which returns a list of dogs with their nested owners (denormalized):
{
"date" : "8\/03\/2018",
"name" : "Bella",
"owner" : {
"kind" : "person",
"date" : "8\/01\/2018",
"id" : "1",
"first" : "Bob",
"last" : "Loblaw",
"age" : "32"
}
}
{
"date" : "8\/07\/2018",
"name" : "Scout",
"owner" : {
"kind" : "person",
"date" : "8\/05\/2018",
"id" : "3",
"first" : "Charlie",
"last" : "McCharles",
"age" : "23"
}
}
Upvotes: 1
Reputation: 25939
case class Person(id: Int, firstName: String, lastName: String, age: Int)
val df=spark.read.csv("..whateverpath.csv")
val p = df.filter(trim($"_c1")==="person").select(trim($"_c2").cast("Int").alias("id"),$"_c3".alias("firstName"),$"_c4".alias("lastName"),trim($"_c5").cast("Int").alias("age")).as[Person]
and similarly for the Dog case class (note that this would break if the first row will be shorter - in which case you want to specify a schema).
To prevent reading the files twice, you can cache df.cache()
before you process it
Upvotes: 2