Axel971
Axel971

Reputation: 43

How to do parallel computing inside a cluster with the R future package?

I would like to distribute jobs (with a for loop) inside nodes of a cluster (several machines). I try to use the R package future to do that. I didn't know if it's the best way to do that; I tried to use foreach of the doParallel package, but I didn't succeed. How can I figure out when the number of the loop iterations is greater than the number of cluster nodes?

library(doParallel);
library(doFuture);
#library(future);

registerDoFuture();

workers <- c(rep("129.20.25.61",1), rep("129.20.25.217",1));
cl <- makeClusterPSOCK(workers, revtunnel = TRUE, outfile = "", verbose = FALSE);

plan(cluster, workers = cl)

mu <- 1.0
sigma <- 2.0

for(i in 1:3){
 res %<-%{ rnorm(i, mean = mu, sd = sigma)}
 print(i);
}

Upvotes: 3

Views: 2416

Answers (1)

HenrikB
HenrikB

Reputation: 6805

If you use the plain Future API, i.e. future() + value() or %<-%, there is no need to involve foreach, doFuture etc. Here is how to use the Future API by itself and what output you can expect:

(A) Setup the workers

library("future")

workers <- c("129.20.25.61", "129.20.25.217")
cl <- makeClusterPSOCK(workers, revtunnel = TRUE, outfile = "")
### starting worker pid=20026 on localhost:11900 at 11:47:28.334
### starting worker pid=12291 on localhost:11901 at 11:47:37.172

print(cl)
### socket cluster with 2 nodes on hosts '129.20.25.61', '129.20.25.217'

plan(cluster, workers = cl)

(B) Explicit Future API

Here we create a list of futures explicitly using future() and retrieve for their values using values() (basically equal to calling lapply(f, FUN = value)).

mu <- 1.0
sigma <- 2.0

f <- list()
for (i in 1:3) {
  f[[i]] <- future({ rnorm(i, mean = mu, sd = sigma) })
}
v <- values(f)
str(v)
### List of 3
###  $ : num 3.25
###  $ : num [1:2] 3.24 3.29
###  $ : num [1:3] 1.251 2.299 0.923

(C) Implicit Future API

In this alternative, we create futures implicitly using the future assignment operator %<-% (which internally will do future() and then value() when you try to access the future's value). Since %<-% can only assign to environments (and not lists, data.frames, etc), we need to use a container that is an environment. Here I use the listenv class, which is an environment but allows you to index it as list.

library("listenv")  ## listenv()
mu <- 1.0
sigma <- 2.0

v <- listenv()
for (i in 1:3) {
  v[[i]] %<-% { rnorm(i, mean = mu, sd = sigma) }
}
v <- as.list(v)
str(v)
### List of 3
###  $ : num 1.15
###  $ : num [1:2] 2.2277 -0.0164
###  $ : num [1:3] -2.09 3.34 -1.09

(D) Use future_lapply()

If you prefer an lapply()-like approach, you could do:

v <- future_lapply(1:3, FUN = function(i) {
  rnorm(i, mean = mu, sd = sigma)
})
str(v)
### List of 3
###  $ : num 2.12
###  $ : num [1:2] 2.56 -1.21
###  $ : num [1:3] 2.89 -0.159 -0.983

(D) Using foreach()

If you'd like to use foreach(), then you can do as follows. Note that it's best to always export global variables explicitly when using foreach() per design of foreach - however, if you always use doFuture it's actually not needed.

library("doFuture")
registerDoFuture()
workers <- c("129.20.25.61", "129.20.25.217")
cl <- makeClusterPSOCK(workers, revtunnel = TRUE, outfile = "")
plan(cluster, workers = cl)

v <- foreach(i = 1:3, .export = c("mu", "sigma")) %dopar% {
  rnorm(i, mean = mu, sd = sigma)
}
str(v)
### List of 3
###  $ : num 3.12
###  $ : num [1:2] -0.0887 -2.8016
###  $ : num [1:3] 2.15 3.5 -2.24

How can I figure out when the number of the loop iterations is taller than the number of cluster nodes?

I'm not sure what you're asking here. Are you concerned about having more futures running at one time that you have workers? If so, that is automatically taken care of. If all workers are occupied, then the creation of additional futures will block until one of the workers are available again.

Upvotes: 5

Related Questions