Reputation: 1158
I have a SparkR script that loads some libraries, and also defines functions from other sourced scripts. The head of it looks like this:
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
library(devtools)
library(JC.utilities) # user-created
source("path/to/clean_data.R")
where clean_data.R
uses some functions found in the JM.utilities
library, like this:
clean_data <- function(df) {
...
return(JC.utilities::reformat(df))
}
In base R, this is no problem, since by sourcing the R script in which this function is defined should mean that the library and the function are now in the same scope. But in Spark, I get this error message when I call clean_data()
:
Error in loadNamespace(name) : there is no package called ‘JM.utilities’
Two questions:
Upvotes: 1
Views: 597
Reputation: 10406
You loaded the library on the driver, but not on the executors that are working on separate workspaces.
You need to import JC.utilities within the function that you apply to your dataframe.
If the library is not installed at all on the executors, you will need to do it or you may try doing this.
I devised a minimal example to illustrate my point with the library stringr and its function str_length:
> d<-data.frame(a=c("a", "abcd", "bb"), b=c(1,2,3))
> df <- as.DataFrame(d)
Let's try dapply to see that it works with base R code:
> dapplyCollect(df, function(x) {cbind(x$a, x$b, x$b*x$b+1) })
[,1] [,2] [,3]
[1,] "a" "1" "2"
[2,] "abcd" "2" "5"
[3,] "bb" "3" "10"
It works! Then let's try to count the number of characters of the 1st column (I could use nchar but the idea is to demonstrate how to use a library).
> import(stringr)
> str_length("abcdef")
[1] 4
It works fine in the driver. Let's try with spark:
> dapplyCollect(df, function(x) {cbind(x$a, x$b, x$b*x$b+1, str_length(x$a)) })
17/11/08 18:55:17 ERROR executor.Executor: Exception in task 0.0 in stage 10.0 (TID 10)
org.apache.spark.SparkException: R computation failed with
Error in cbind(x$a, x$b, x$b * x$b + 1, str_length(x$a)) :
could not find function "str_length"
As I explained, importing the library in the driver does not import it in the workers. Yet that's where I need it. Let's now import the library within dapply:
> dapplyCollect(df, function(x) {library(stringr); cbind(x$a, x$b, x$b*x$b+1, str_length(x$a)) })
[,1] [,2] [,3] [,4]
[1,] "a" "1" "2" "1"
[2,] "abcd" "2" "5" "4"
[3,] "bb" "3" "10" "2"
And there you go.
Upvotes: 2