Stan_1989
Stan_1989

Reputation: 23

Why are Window functions (Lag) not working in SparkR?

I am fairly new to Spark and SparkR for that matter and might have some basic questions.

The objective of this exercise was to implement window functions (lead,lag,rank etc) in SparkR.

I referred to the link below and the Databricks post mentioned there but to no avail-

SparkSQL - Lag function?

Code snippet I used:

Initialize sqlContext and register the data frame as a temp table using Registertemptable

output_data<-SparkR::sql(sqlContext, "select *,lag(type) over(partition by key order by key) as lag_type from input_data")

The error we faced was:

failure: ``union'' expected but `(' found

Another suggestion which I found was to use a Hivecontext rather than a SQLcontext as SQLcontext might not allow all functionalities.

In that approach, initializing a Hivecontext and attempting to run a HiveQL to do the same gave us an error saying:

cannot find table named input_table

Question : do we need to run some command similar to registertemptable so as to allow Hivecontext access to the table?

saveastable might be an option but from what I read up, it would collect the data in the S3 storage rather than it being in-memory of the cluster.

Would appreciate any help on this! Thanks!

Upvotes: 2

Views: 720

Answers (1)

zero323
zero323

Reputation: 330413

Lets prepare an input data.frame using freeny dataset.

ldf <- freeny

# Extract year and quater
ldf$yr <- as.integer(rownames(ldf))
ldf$qr <- as.integer(4 * (as.numeric(rownames(ldf)) - ldf$yr))

# Clean column names
colnames(ldf) <- gsub("\\.", "_",  colnames(ldf))

# Drop a couple of things so output fits nicely in the code box
row.names(ldf) <- NULL
ldf$market_potential <- NULL

head(ldf)


##         y lag_quarterly_revenue price_index income_level yr   qr
## 1 8.79236               8.79636     4.70997      5.82110 1962      1
## 2 8.79137               8.79236     4.70217      5.82558 1962      2
## 3 8.81486               8.79137     4.68944      5.83112 1962      3
## 4 8.81301               8.81486     4.68558      5.84046 1963      0
## 5 8.90751               8.81301     4.64019      5.85036 1963      1
## 6 8.93673               8.90751     4.62553      5.86464 1963      2

Another suggestion which I found was to use a Hivecontext rather than a SQLcontext as SQLcontext might not allow all functionalities.

This is correct, most of the advanced functions is supported only by the HiveContext, while a default one is SQLContext. First of all you have to make sure that your Spark version has been build with Hive support. It is true about binaries available from Spark downloads page but if you build from source you be sure to use -Phive flag.

hiveContext <- sparkRHive.init(sc)
sdf <- createDataFrame(hiveContext, ldf)
printSchema(sdf)

## root
##  |-- y: double (nullable = true)
##  |-- lag_quarterly_revenue: double (nullable = true)
##  |-- price_index: double (nullable = true)
##  |-- income_level: double (nullable = true)
##  |-- yr: integer (nullable = true)
##  |-- qr: integer (nullable = true)

initialize sqlContext and register the data frame as a temp table using Registertemptable

Thats right as well. To be able to use sql command you have register a table.

registerTempTable(sdf, "sdf")

Remember that DataFrame is bound to context which has been used to create it.

head(tables(hiveContext))

##  tableName isTemporary
## 1       sdf        TRUE

head(tables(sqlContext))

## [1] tableName   isTemporary
## <0 rows> (or 0-length row.names)

Finally example query:

query <- "SELECT yr, qr, y, lag_quarterly_revenue AS old_lag,
          LAG(y) OVER (ORDER BY yr, qr) AS new_lag 
          FROM sdf"

sql(hiveContext, query)

##     yr qr       y old_lag new_lag
## 1 1962  1 8.79236 8.79636      NA
## 2 1962  2 8.79137 8.79236 8.79236
## 3 1962  3 8.81486 8.79137 8.79137
## 4 1963  0 8.81301 8.81486 8.81486
## 5 1963  1 8.90751 8.81301 8.81301
## 6 1963  2 8.93673 8.90751 8.90751

Upvotes: 1

Related Questions