Reputation: 5461
I am converting a plain R code into SparkR to make efficient use of Spark.
I have the below column CloseDate.
CloseDate
2011-01-08
2011-02-07
2012-04-07
2013-04-18
2011-02-07
2010-11-10
2010-12-09
2013-02-18
2010-12-09
2011-03-11
2011-04-10
2013-06-19
2011-04-10
2011-01-06
2011-02-06
2013-04-16
2011-02-06
2015-09-25
2015-09-25
2010-11-10
I want to count the number of time that date has been increased|decreased. I have the below R code to do that.
dateChange <- function(closeDate, dir){
close_dt <- as.Date(closeDate)
num_closedt_out = 0
num_closedt_in = 0
for(j in 1:length(close_dt))
{
curr <- close_dt[j]
if (j > 1)
prev <- close_dt[j-1]
else
prev <- curr
if (curr > prev){
num_closedt_out = num_closedt_out + 1
}
else if (curr < prev){
num_closedt_in = num_closedt_in + 1
}
}
if (dir=="inc")
ret <- num_closedt_out
else if (dir=="dec")
ret <- num_closedt_in
ret
}
I tried to use SparkR df$col here. Since spark lazily executes the code, I didn't get the value of length during this execution and getting NaN error.
Here is the modified code that I tried.
DateDirChanges <- function(closeDate, dir){
close_dt <- to_date(closeDate)
num_closedt_out = 0
num_closedt_in = 0
col_len <- SparkR::count(close_dt)
for(j in 1:col_len)
{
curr <- close_dt[j]
if (j > 1)
prev <- close_dt[j-1]
else
prev <- curr
if (curr > prev){
num_closedt_out = num_closedt_out + 1
}
else if (curr < prev){
num_closedt_in = num_closedt_in + 1
}
}
if (dir=="inc")
ret <- num_closedt_out
else if (dir=="dec")
ret <- num_closedt_in
ret
}
How can I get the length of a column during the execution of this code? Or is there any other better to do it?
Upvotes: 0
Views: 535
Reputation: 330413
You cannot because Column
simply has no length. Unlike what you may expect in R columns don't represent data but SQL expressions and specific data transformations. Moreover order of values in Spark DataFrame
is arbitrary so you cannot simply look around.
If data can be partitioned as in your previous question you can use window functions in the same may as I've shown in the answer to your previous question. Otherwise there is no efficient way to handle this using SparkR alone.
Assuming there is a way to determine order (required) and you can partition your data (desired to get reasonable performance) all you need is something like this:
SELECT
CAST(LAG(CloseDate, 1) OVER w > CloseDate AS INT) gt,
CAST(LAG(CloseDate, 1) OVER w < CloseDate AS INT) lt,
CAST(LAG(CloseDate, 1) OVER w = CloseDate AS INT) eq
FROM DF
WINDOW w AS (
PARTITION BY partition_col ORDER BY order_col
)
Upvotes: 2