data princess
data princess

Reputation: 1158

Library load not extending to functions in Spark

I have a SparkR script that loads some libraries, and also defines functions from other sourced scripts. The head of it looks like this:

library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"))) 
library(devtools)   
library(JC.utilities)         # user-created

source("path/to/clean_data.R")

where clean_data.R uses some functions found in the JM.utilities library, like this:

clean_data <- function(df) {
  ... 
  return(JC.utilities::reformat(df))
}

In base R, this is no problem, since by sourcing the R script in which this function is defined should mean that the library and the function are now in the same scope. But in Spark, I get this error message when I call clean_data():

Error in loadNamespace(name) : there is no package called ‘JM.utilities’

Two questions:

  1. Why doesn't Spark seem to remember that I've already loaded this library?
  2. Is there a way around this issue without having to define a bunch of things explicitly?

Upvotes: 1

Views: 597

Answers (1)

Oli
Oli

Reputation: 10406

You loaded the library on the driver, but not on the executors that are working on separate workspaces.

You need to import JC.utilities within the function that you apply to your dataframe.

If the library is not installed at all on the executors, you will need to do it or you may try doing this.

I devised a minimal example to illustrate my point with the library stringr and its function str_length:

> d<-data.frame(a=c("a", "abcd", "bb"), b=c(1,2,3))
> df <- as.DataFrame(d)

Let's try dapply to see that it works with base R code:

> dapplyCollect(df, function(x) {cbind(x$a, x$b, x$b*x$b+1) })
    [,1]   [,2] [,3] 
[1,] "a"    "1"  "2"  
[2,] "abcd" "2"  "5"  
[3,] "bb"   "3"  "10" 

It works! Then let's try to count the number of characters of the 1st column (I could use nchar but the idea is to demonstrate how to use a library).

> import(stringr)
> str_length("abcdef")
[1] 4

It works fine in the driver. Let's try with spark:

> dapplyCollect(df, function(x) {cbind(x$a, x$b, x$b*x$b+1, str_length(x$a)) })
17/11/08 18:55:17 ERROR executor.Executor: Exception in task 0.0 in stage 10.0 (TID 10)
org.apache.spark.SparkException: R computation failed with
 Error in cbind(x$a, x$b, x$b * x$b + 1, str_length(x$a)) : 
  could not find function "str_length"

As I explained, importing the library in the driver does not import it in the workers. Yet that's where I need it. Let's now import the library within dapply:

> dapplyCollect(df, function(x) {library(stringr); cbind(x$a, x$b, x$b*x$b+1, str_length(x$a)) })
     [,1]   [,2] [,3] [,4]
[1,] "a"    "1"  "2"  "1" 
[2,] "abcd" "2"  "5"  "4" 
[3,] "bb"   "3"  "10" "2"

And there you go.

Upvotes: 2

Related Questions