Reputation: 23
I am fairly new to Spark and SparkR for that matter and might have some basic questions.
The objective of this exercise was to implement window functions (lead,lag,rank etc) in SparkR.
I referred to the link below and the Databricks post mentioned there but to no avail-
Code snippet I used:
Initialize sqlContext and register the data frame as a temp table using Registertemptable
output_data<-SparkR::sql(sqlContext, "select *,lag(type) over(partition by key order by key) as lag_type from input_data")
The error we faced was:
failure: ``union'' expected but `(' found
Another suggestion which I found was to use a Hivecontext rather than a SQLcontext as SQLcontext might not allow all functionalities.
In that approach, initializing a Hivecontext and attempting to run a HiveQL to do the same gave us an error saying:
cannot find table named input_table
Question : do we need to run some command similar to registertemptable so as to allow Hivecontext access to the table?
saveastable might be an option but from what I read up, it would collect the data in the S3 storage rather than it being in-memory of the cluster.
Would appreciate any help on this! Thanks!
Upvotes: 2
Views: 720
Reputation: 330413
Lets prepare an input data.frame
using freeny
dataset.
ldf <- freeny
# Extract year and quater
ldf$yr <- as.integer(rownames(ldf))
ldf$qr <- as.integer(4 * (as.numeric(rownames(ldf)) - ldf$yr))
# Clean column names
colnames(ldf) <- gsub("\\.", "_", colnames(ldf))
# Drop a couple of things so output fits nicely in the code box
row.names(ldf) <- NULL
ldf$market_potential <- NULL
head(ldf)
## y lag_quarterly_revenue price_index income_level yr qr
## 1 8.79236 8.79636 4.70997 5.82110 1962 1
## 2 8.79137 8.79236 4.70217 5.82558 1962 2
## 3 8.81486 8.79137 4.68944 5.83112 1962 3
## 4 8.81301 8.81486 4.68558 5.84046 1963 0
## 5 8.90751 8.81301 4.64019 5.85036 1963 1
## 6 8.93673 8.90751 4.62553 5.86464 1963 2
Another suggestion which I found was to use a Hivecontext rather than a SQLcontext as SQLcontext might not allow all functionalities.
This is correct, most of the advanced functions is supported only by the HiveContext
, while a default one is SQLContext
. First of all you have to make sure that your Spark version has been build with Hive support. It is true about binaries available from Spark downloads page but if you build from source you be sure to use -Phive
flag.
hiveContext <- sparkRHive.init(sc)
sdf <- createDataFrame(hiveContext, ldf)
printSchema(sdf)
## root
## |-- y: double (nullable = true)
## |-- lag_quarterly_revenue: double (nullable = true)
## |-- price_index: double (nullable = true)
## |-- income_level: double (nullable = true)
## |-- yr: integer (nullable = true)
## |-- qr: integer (nullable = true)
initialize sqlContext and register the data frame as a temp table using Registertemptable
Thats right as well. To be able to use sql
command you have register a table.
registerTempTable(sdf, "sdf")
Remember that DataFrame is bound to context which has been used to create it.
head(tables(hiveContext))
## tableName isTemporary
## 1 sdf TRUE
head(tables(sqlContext))
## [1] tableName isTemporary
## <0 rows> (or 0-length row.names)
Finally example query:
query <- "SELECT yr, qr, y, lag_quarterly_revenue AS old_lag,
LAG(y) OVER (ORDER BY yr, qr) AS new_lag
FROM sdf"
sql(hiveContext, query)
## yr qr y old_lag new_lag
## 1 1962 1 8.79236 8.79636 NA
## 2 1962 2 8.79137 8.79236 8.79236
## 3 1962 3 8.81486 8.79137 8.79137
## 4 1963 0 8.81301 8.81486 8.81486
## 5 1963 1 8.90751 8.81301 8.81301
## 6 1963 2 8.93673 8.90751 8.90751
Upvotes: 1