Bamqf
Bamqf

Reputation: 3542

Filter rows by timestamp in DataFrame of SparkR

I want to filter rows of DataFrame in SparkR by time stamp with format like the following:

df <- createDataFrame(sqlContext, data.frame(ID = c(1,2,3),
                                             Timestamp=c('08/01/2014 11:18:30',
                                                         '01/01/2015 12:13:45',
                                                         '05/01/2015 14:17:33')))

Please note that original schema for TimeStamp column is String. Say I want to filter those time stamp before 03/01/2015 00:00:00, I think there might be two approaches to do this:

One is to mutate the column to timestamp like normal R with dplyr and lubridate:

df %>%
 mutate(Timestamp = mdy_hms(Timestamp)) %>%
 filter(Timestamp < mdy_hms('03/01/2015 00:00:00'))

But I failed to mutate columns of DataFrame since it's a S4 class Column not a vector.

Second approach might be to register the DataFrame as a table and then use SparkSQL to deal with timestamp type:

df <- createDataFrame(sqlContext, data.frame(ID = c(1,2,3),
                                             Timestamp=c('08/01/2014 11:18:30',
                                                         '01/01/2015 12:13:45',
                                                         '05/01/2015 14:17:33')))
registerTempTable(df, 'df')
head(sql(sqlContext, 'SELECT * FROM df WHERE Timestamp < "03/01/2015 00:00:00"'))

But since it's still a string comparison so it would give wrong result. What would be correct way to do this?

Upvotes: 0

Views: 1301

Answers (1)

zero323
zero323

Reputation: 330113

Spark 1.6+

You should be able to use unix_timestamp function and standard SQLContext:

ts <- unix_timestamp(df$Timestamp, 'MM/dd/yyyy HH:mm:ss') %>%
  cast("timestamp")

df %>% 
   where(ts <  cast(lit("2015-03-01 00:00:00"), "timestamp"))

Spark < 1.

This should do the trick:

sqlContext <- sparkRHive.init(sc)

query <- "SELECT * FROM df
    WHERE unix_timestamp(Timestamp, 'MM/dd/yyyy HH:mm:ss') < 
          unix_timestamp('2015-03-01 00:00:00')" # yyyy-MM-dd HH:mm:ss 

df <- createDataFrame(sqlContext, ...)
registerTempTable(df, 'df')

head(sql(sqlContext, query))

##   ID           Timestamp
## 1  1 08/01/2014 11:18:30
## 2  2 01/01/2015 12:13:45

Please note that the choice of the context is important here. Since unix_timestamp is a Hive UDF standard SQLContext you get by default in SparkR won't work here.

Upvotes: 3

Related Questions