Reputation: 33
I am trying to load a tab separated file containing two timestamp columns and generate a calculated column which is the difference (in days) between one of the columns and current timestamp. I have applied the registerTempTable() method on the RDD to convert it into a SchemaRDD. After that I have pretty much hit the wall as all subsequent operations depend on this datediff calculated field.
Here is what I have done so far. Thanks for the help !
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
val conf = new SparkConf().setMaster("local[2]").setAppName("CookieSummary")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD
case class CookieDates(CLPartnerSyncCreateDT: String, CookieSyncRequestDT: String)
val cookies = sc.textFile("/Users/shubhro/Documents/dataFiles/clean/worker1.01012015.1420081201_sub.tsv").map(_.split("\t")).map(p => CookieDates(p(0), p(1)))
cookies.registerTempTable("cookies")
val allCookies = sqlContext.sql("SELECT CAST(CLPartnerSyncCreateDT AS TIMESTAMP),CAST(CookieSyncRequestDT AS TIMESTAMP) FROM cookies")
allCookies.collect().foreach(println)
Upvotes: 1
Views: 2958
Reputation: 171
In Spark 1.5.0 a built-in function introduced:
https://issues.apache.org/jira/browse/SPARK-8185
Upvotes: 2
Reputation: 6693
Which version of spark are you using? I notice you use SchemaRDD
, it's a notion that has been replaced in Spark 1.3.0 by DataFrame
.
What you need here is to Define an User Defined Function
and use it in sql(...)
In Spark 1.2.0:
val x = new TimeStamp(...) // The base time you want to diff
registerFunction("dateDiff", (arg: TimeStamp) => (arg - x))
sql("select dateDiff(col_name_here) from cookies")
After Spark 1.3.0: see dataFrame notion on UDF usage
Upvotes: 0