Carlos Ribeiro
Carlos Ribeiro

Reputation: 183

R parLapply not parallel

I'm currently developing an R package that will be using parallel computing to solve some tasks, through means of the "parallel" package.

I'm getting some really awkward behavior when utilizing clusters defined inside functions of my package, where the parLapply function assigns a job to a worker and waits for it to finish to assign a job to next worker. Or at least this is what appears to be happening, through the observation of the log file "cluster.log" and the list of running processes in the unix shell.

Below is a mockup version of the original function declared inside my package:

.parSolver <- function( varMatrix, var1 ) {

    no_cores <- detectCores()

    #Rows in varMatrix
    rows <- 1:nrow(varMatrix[,])

    # Split rows in n parts
    n <- no_cores
    parts <- split(rows, cut(rows, n))

    # Initiate cluster
    cl <- makePSOCKcluster(no_cores, methods = FALSE, outfile = "/home/cluster.log")
    clusterEvalQ(cl, library(raster))
    clusterExport(cl, "varMatrix", envir=environment())
    clusterExport(cl, "var1", envir=environment())


    rParts <- parLapply(cl = cl, X = 1:n, fun = function(x){
        part <- rasterize(varMatrix[parts[[x]],], raster(var1), .....)
        print(x)
        return(part)
        })

    do.call(merge, rParts)
}

NOTES:

The weird part to me is if I execute the exact same code of the function parSolver in a global environment every thing works smoothly, all workers take one job at the same time and the task completes in no time. However if I do something like:

library(myPackage)

varMatrix <- (...)
var1 <- (...)
result <- parSolver(varMatrix, var1)

the described problem appears.

It appears to be a load balancing problem however that does not explain why it works ok in one situation and not in the other.

Am I missing something here? Thanks in advance.

Upvotes: 4

Views: 2047

Answers (2)

user3005996
user3005996

Reputation: 21

Complement to steve-weston's excellent answer: if you must, you can define a function f0 within a function f1 and then run f0 parallel within f1, but you have to define f0's environment properly:

f1 <- function(x, cl) {
  ...
  f0 <- function(y) { ... }
  environment(f0) <- .GlobalEnv
  res <- parLapply(cl, x, f0)
  ...
})

This "almost" always works for me, in the sense that I get runtimes close to if not identical to what I would get if running the function's code in the main environment. I say almost because I have encountered at least one case where it didn't which I never managed to solve.

I also second the advice to not export the whole of varMatrix to each worker. A (much) faster solution is to break it into as many chunks as you have workers, serialize those as temporary files onto disk, then have the function's workers un-serialize the relevant chunk only.

Upvotes: 0

Steve Weston
Steve Weston

Reputation: 19677

I don't think parLapply is running sequentially. More likely, it's just running inefficiently, making it appear to run sequentially.

I have a few suggestions to improve it:

  • Don't define the worker function inside parSolver
  • Don't export all of varMatrix to each worker
  • Create the cluster outside of parSolver

The first point is important, because as your example now stands, all of the variables defined in parSolver will be serialized along with the anonymous worker function and sent to the workers by parLapply. By defining the worker function outside of any function, the serialization won't capture any unwanted variables.

The second point avoids unnecessary socket I/O and uses less memory, making the code more scalable.

Here's a fake, but self-contained example that is similar to yours that demonstrates my suggestions:

# Define worker function outside of any function to avoid
# serialization problems (such as unexpected variable capture)
workerfn <- function(mat, var1) {
    library(raster)
    mat * var1
}

parSolver <- function(cl, varMatrix, var1) {
    parts <- splitIndices(nrow(varMatrix), length(cl))
    varMatrixParts <- lapply(parts, function(i) varMatrix[i,,drop=FALSE])
    rParts <- clusterApply(cl, varMatrixParts, workerfn, var1)
    do.call(rbind, rParts)
}

library(parallel)
cl <- makePSOCKcluster(3)
r <- parSolver(cl, matrix(1:20, 10, 2), 2)
print(r)

Note that this takes advantage of the clusterApply function to iterate over a list of row-chunks of varMatrix so that the entire matrix doesn't need to be sent to everyone. It also avoids calls to clusterEvalQ and clusterExport, simplifying the code, as well as making it a bit more efficient.

Upvotes: 4

Related Questions