Reputation: 183
I'm currently developing an R package that will be using parallel computing to solve some tasks, through means of the "parallel" package.
I'm getting some really awkward behavior when utilizing clusters defined inside functions of my package, where the parLapply function assigns a job to a worker and waits for it to finish to assign a job to next worker. Or at least this is what appears to be happening, through the observation of the log file "cluster.log" and the list of running processes in the unix shell.
Below is a mockup version of the original function declared inside my package:
.parSolver <- function( varMatrix, var1 ) {
no_cores <- detectCores()
#Rows in varMatrix
rows <- 1:nrow(varMatrix[,])
# Split rows in n parts
n <- no_cores
parts <- split(rows, cut(rows, n))
# Initiate cluster
cl <- makePSOCKcluster(no_cores, methods = FALSE, outfile = "/home/cluster.log")
clusterEvalQ(cl, library(raster))
clusterExport(cl, "varMatrix", envir=environment())
clusterExport(cl, "var1", envir=environment())
rParts <- parLapply(cl = cl, X = 1:n, fun = function(x){
part <- rasterize(varMatrix[parts[[x]],], raster(var1), .....)
print(x)
return(part)
})
do.call(merge, rParts)
}
NOTES:
The weird part to me is if I execute the exact same code of the function parSolver in a global environment every thing works smoothly, all workers take one job at the same time and the task completes in no time. However if I do something like:
library(myPackage)
varMatrix <- (...)
var1 <- (...)
result <- parSolver(varMatrix, var1)
the described problem appears.
It appears to be a load balancing problem however that does not explain why it works ok in one situation and not in the other.
Am I missing something here? Thanks in advance.
Upvotes: 4
Views: 2047
Reputation: 21
Complement to steve-weston's excellent answer: if you must, you can define a function f0 within a function f1 and then run f0 parallel within f1, but you have to define f0's environment properly:
f1 <- function(x, cl) {
...
f0 <- function(y) { ... }
environment(f0) <- .GlobalEnv
res <- parLapply(cl, x, f0)
...
})
This "almost" always works for me, in the sense that I get runtimes close to if not identical to what I would get if running the function's code in the main environment. I say almost because I have encountered at least one case where it didn't which I never managed to solve.
I also second the advice to not export the whole of varMatrix
to each worker. A (much) faster solution is to break it into as many chunks as you have workers, serialize those as temporary files onto disk, then have the function's workers un-serialize the relevant chunk only.
Upvotes: 0
Reputation: 19677
I don't think parLapply
is running sequentially. More likely, it's just running inefficiently, making it appear to run sequentially.
I have a few suggestions to improve it:
parSolver
varMatrix
to each workerparSolver
The first point is important, because as your example now stands, all of the variables defined in parSolver
will be serialized along with the anonymous worker function and sent to the workers by parLapply
. By defining the worker function outside of any function, the serialization won't capture any unwanted variables.
The second point avoids unnecessary socket I/O and uses less memory, making the code more scalable.
Here's a fake, but self-contained example that is similar to yours that demonstrates my suggestions:
# Define worker function outside of any function to avoid
# serialization problems (such as unexpected variable capture)
workerfn <- function(mat, var1) {
library(raster)
mat * var1
}
parSolver <- function(cl, varMatrix, var1) {
parts <- splitIndices(nrow(varMatrix), length(cl))
varMatrixParts <- lapply(parts, function(i) varMatrix[i,,drop=FALSE])
rParts <- clusterApply(cl, varMatrixParts, workerfn, var1)
do.call(rbind, rParts)
}
library(parallel)
cl <- makePSOCKcluster(3)
r <- parSolver(cl, matrix(1:20, 10, 2), 2)
print(r)
Note that this takes advantage of the clusterApply
function to iterate over a list of row-chunks of varMatrix
so that the entire matrix doesn't need to be sent to everyone. It also avoids calls to clusterEvalQ
and clusterExport
, simplifying the code, as well as making it a bit more efficient.
Upvotes: 4