Reputation: 841
This might be one for the philosophers... (or @Steve Weston or @Martin Morgan)
I've been having some issues with memory leaks when using parLapply, and after digging through enough threads on the matter, I think this question is well warranted. I've taken some time to try and figure this one out, and while I've got an inkling of a clue as to why the observed behavior happens, I'm lost as to how to resolve it.
Consider the following as a sourced script, saved as: parallel_question.R
rf.parallel<-function(n=10){
library(parallel)
library(randomForest)
rf.form<- as.formula(paste("Final", paste(c('x','y','z'), collapse = "+"), sep = " ~ "))
rf.df<-data.frame(Final=runif(10000),y=runif(10000),x=runif(10000),z=runif(10000))
rf.df.list<-split(rf.df,rep(1:n,nrow(rf.df))[1:nrow(rf.df)])
cl<-makeCluster(n)
rf.list<-parLapply(cl,rf.df.list,function(x,rf.form,n){
randomForest::randomForest(rf.form,x,ntree=100,nodesize=10, norm.votes=FALSE)},rf.form,n)
stopCluster(cl)
return(rf.list)
}
We source and run the script with:
scrip.loc<-"G:\\Scripts_Library\\R\\Stack_Answers\\parallel_question.R"
source(scrip.loc)
rf.parallel(n=10)
Fairly straight forward... we ran several random forest in parallel. Seems to be memory efficient. We could combine them later, or do something else. Handy. Nice. Well behaved.
Now consider the following script, saved as parallel_question_2.R
rf.parallel_2<-function(n=10){
library(parallel)
library(magrittr)
library(randomForest)
rf.form<- as.formula(paste("Final", paste(c('x','y','z'), collapse = "+"), sep = " ~ "))
rf.df<-data.frame(Final=runif(10000),y=runif(10000),x=runif(10000),z=runif(10000))
large.list<-rep(rf.df,10000)
rf.df.list<-split(rf.df,rep(1:n,nrow(rf.df))[1:nrow(rf.df)])
cl<-makeCluster(n)
rf.list<-parLapply(cl,rf.df.list,function(x,rf.form,n){
randomForest::randomForest(rf.form,x,ntree=100,nodesize=10, norm.votes=FALSE)},rf.form,n)
stopCluster(cl)
return(rf.list)
}
In this second script, we've got a large list in our sourced environment. We are not calling the list or bringing it into our parallel function. I've set the size of the list to probably be a problem on at least a 32gb machine.
scrip.loc<-"G:\\Scripts_Library\\R\\Stack_Answers\\parallel_question_2.R"
source(scrip.loc)
rf.parallel_2(n=10)
When we run the second script, we end up carrying around ~3gb (the size of our large list) * the number of worker threads set to the cluster, additional material around. If we run the contents of the second script in a non-sourced environment, this is not the behavior; rather, we get one ~3gb list, the parallelized function runs without issue, and thats the end of it.
So.. how/why are the worker environments taking uneccessary variables elements from the parent environment? Why does it only happen in sourced scripts? How can I mitigate for this when I have a sourced, large and complex script, which has sub-sections which are parallelized (but may have 3-10gb of intermediate data being carried around)?
Relevant or similar threads:
Using parLapply and clusterExport inside a function
clusterExport, environment and variable scoping
Upvotes: 2
Views: 777
Reputation: 1
Towards the end of processing I was passing close to 50 GBs of data back into the parLapply
, which was not... ideal.
I ended up creating a new function that called the parLapply
. I placed it inside my nested loop, created a new environment there, set the parent environment to the .GlobalEnv
, passed only variables needed to the new environment, and then passed that environment to clusterExport
.
For details on environments, I'd recommend this blog post. Also, I found the Parallel R book by Ethan McCallum and Stephen Weston to be helpful. On pages 15-17, there is a discussion on this issue from the 'snow' package.
Upvotes: 0
Reputation: 46856
The signature of parLapply(cl, X, FUN, ...)
applies FUN
to each element of X
. The worker needs to know FUN
, so this is serialized and sent to the worker. What is an R function? It's the code that defines the function, and the environment in which the function was defined. Why the environment? because in R it's legal to reference variables defined outside of FUN
, e.g.,
f = function(y) x + y
x = 1; f(1)
## [1] 2
As a second complexity, R allows the function to update variables outside the function
f = function(y) { x <<- x + 1; x + y }
x = 1; f(1)
## [1] 3
In the above, we can imagine that we could figure out which parts of the environment of f()
need to be seen (only the variable x
), but in general this kind of analysis is not possible without actually evaluating the function, e.g., f = function(y, name) get(name) + y; x = 1; f(1, "x")
So for FUN
to be evaluated on the worker, the worker needs to know both the definition of FUN
and the content of the environment FUN
was defined in. R lets the worker know about FUN
by using serialize()
. The consequence is easy to see
f = function(n) { x = sample(n); length(serialize(function() {}, NULL)) }
f(1)
## [1] 754
f(10)
## [1] 1064
f(100)
## [1] 1424
Larger objects in the environment result in more information sent to / used by the worker.
If you think about it, the description so far would mean that the entire R session should be serialized to the worker (or to disk, if serialize()
were being used to save objects) -- the environment of the implicit function in f()
includes the body of f()
, but also the environment of f()
, which is the global environment, and the environment of the global environment, which is the search path... (check out environment(f)
and parent.env(.GlobalEnv)
). R has an arbitrary rule that it stops at the global environment. So instead of using an implicit function() {}
, define this in the .GlobalEnv
g = function() {}
f = function(n) { x = sample(n); length(serialize(g, NULL)) }
f(1)
## [1] 592
f(1000)
## [1] 592
Note also that this has consequences for what functions can be serialized. For instance if g()
were serialized in the code below it would 'know' about x
f = function(y) { x = 1; g = function(y) x + y; g() }
f(1)
## [1] 2
but here it does not -- it knows about the symbols in the environment(s) it was defined in but not about the symbols in the environment it was called from.
rm(x)
g = function(y) x + y
f = function(y) { x = 1; g() }
f()
## Error in g() : object 'x' not found
In your script, you could compare
cl = makeCluster(2)
f = function(n) {
x = sample(n)
parLapply(
cl, 1,
function(...)
length(serialize(environment(), NULL))
)
}
f(1)[[1]]
## [1] 256
f(1000)[[1]]
## [1] 4252
with
g = function(...) length(serialize(environment(), NULL))
f = function(n) {
x = sample(n)
parLapply(cl, 1, g)
}
f(1)[[1]]
## [1] 150
f(1000)[[1]]
## [1] 150
Upvotes: 8