Reputation: 1029
I currently have data on a Spark data frame that is formatted as such:
Timestamp Number
......... ......
M-D-Y 3
M-D-Y 4900
The timestamp data is in no way uniform or consistent (i.e., I could have one value that is present on March 1, 2015, and the next value in the table be for the date September 1, 2015 ... also, I could have multiple entries per date).
So I wanted to do two things
Upvotes: 1
Views: 2407
Reputation: 330073
Parsing date is relatively easy using built-in functions by combining unix_timestamp
and simple type casting:
// +---------------------+
// |_c0 |
// +---------------------+
// |2015-03-01 00:00:00.0|
// +---------------------+
With DataFrame
DSL equivalent code would be something like this:
import org.apache.spark.sql.functions.unix_timestamp
unix_timestamp($"date", "MMM d, yyyy").cast("timestamp")
To fill missing entries you can use different tricks. The simplest approach is to use the same parsing logic as above. First let's create a few helpers:
def leap(year: Int) = {
((year % 4 == 0) && (year % 100 != 0)) || (year % 400 == 0)
def weeksForYear(year: Int) = (1 to 52).map(w => s"$year $w")
def daysForYear(year: Int) = (1 to { if(leap(2000)) 366 else 366 }).map(
d => s"$year $d"
and example reference data (here for weeks but you can do the same thing for days):
import org.apache.spark.sql.functions.{year, weekofyear}'
val exprs = Seq(year($"date").alias("year"), weekofyear($"date").alias("week"))
val weeks2015 = Seq(2015)
.flatMap(weeksForYear _)
.withColumn("date", unix_timestamp($"date", "yyyy w").cast("timestamp"))
.select(exprs: _*)
Finally you can transform the original data:
val df = Seq(
("March 1, 2015", 3), ("September 1, 2015", 4900)).toDF("Timestamp", "Number")
val dfParsed = df
.withColumn("date", unix_timestamp($"timestamp", "MMM d, yyyy").cast("timestamp"))
.select(exprs :+ $"Number": _*)
merge and aggregate:
weeks2015.join(dfParsed, Seq("year", "week"), "left")
.groupBy($"year", $"week")
.agg(count($"Number"), avg($"Number"))
Upvotes: 1